python协程学习——写个并发获取网站标题的工具
平时做渗透的时候,有时候给的是一些域名、一些 url 、一些 ip 或者三者都有,手动去一个个地打开比较浪费时间。我们需要用最短时间发现一些有趣的目标,如 xx 管理后台。于是让我们用 python 的协程来写个并发获取网站标题的工具吧,还可以顺便学习下协程的使用。——人生苦短,我用python1. 需求分析先对工具做个需求分析:可以并发获取标题,并且可以根据网络速度设置协程数目。可以读取指定文
平时做渗透的时候,有时候给的是一些域名、一些 url 、一些 ip 或者三者都有,手动去一个个地打开比较浪费时间。我们需要用最短时间发现一些有趣的目标,如 xx 管理后台。于是让我们用 python 的协程来写个并发获取网站标题的工具吧,还可以顺便学习下协程的使用。
——人生苦短,我用python
1. 需求分析
先对工具做个需求分析:
可以并发获取标题,并且可以根据网络速度设置协程数目。
可以读取指定文件中的 url 、域名和 ip 来获取标题。
对于 ip 列表,需要支持 CIDR 格式的 ip 地址段,也就是可以解析如 192.168.1.0/24 这样的 C 段地址来获取标题。
可以把存在标题的网站输出到文件中,也就是80和443端口存在 web 应用的 url 和标题输出到指定的文件中。
程序具有复用性,也就是可以很方便地集成到以后开发的工具中。
2. 原理介绍
在开始开发前,先来解释下什么是协程,它和线程有什么区别。
协程,又称微线程,纤程,英文名Coroutine。协程在执行函数A时,可以随时中断,去执行函数B,接着继续执行函数A。但这一过程并不是函数调用,有点类似CPU的中断。这一整个过程看起来有点像多线程。
比如子程序A、B:
def A():
print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'
假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:
1
2
x
y
3
z
看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有什么优势呢?
协程具有极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
协程主要用来处理异步 IO,异步 IO 是指非阻塞的资源读取。如在发起一个网络请求时,由于需要下载完数据才能完成读取,这需要一段时间,通常这个时候会一直处于阻塞状态,如果在UI线程中执行这种阻塞的操作,还会使程序卡死。
而异步 IO 中存在一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程: 遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。
简单来说,可以把异步IO理解成一个圆形循环的工厂流水线,流水线上有一个工人,当工人拿到一个零件后,开始处理,接着遇到了个阻塞的操作,无法继续处理了,于是工人发出一个IO请求,然后把零件放回流水线上,去处理下一个零件。在之前阻塞的零件转回到工人面前时,该零件已经完成了IO请求,于是工人继续处理零件剩下的步骤。
asyncio 是用来编写并发代码的库,使用 async/await 语法, 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。asyncio 往往是构建 IO 密集型和高层级结构化网络代码的最佳选择。
在 python 3.5 以后,可以通过 async 关键字来定义一个函数为协程。然后通过 await 关键词来等待一个可等待对象,这个对象一般为协程。
如下代码所示, asyncio.sleep() 是一个可等待对象。
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
定义协程后,可以通过 asyncio.run() 运行传入的协程,此函数还负责管理 asyncio 事件循环并 完结异步生成器。
async def main():
await asyncio.sleep(1)
print('hello')
asyncio.run(main())
为了完成并发操作,我们可以创建多个任务来并发执行协程,如果需要同时运行20个协程,则可以通过创建20个任务来运行协程。
asyncio.create_task() 函数将协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。当一个协程通过 asyncio.create_task() 等函数被打包为一个任务,该协程将自动排入日程准备立即运行:
import asyncio
async def nested():
return 42
async def main():
# Schedule nested() to run soon concurrently
# with "main()".
task = asyncio.create_task(nested())
# "task" can now be used to cancel "nested()", or
# can simply be awaited to wait until it is complete:
await task
asyncio.run(main())
注意,create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数,但可读性不高。
要并发运行多个任务,可以使用
asyncio.gather(*aws, loop=None, return_exceptions=False) 方法,
该方法可并发运行 aws 序列中的可等待对象,直到所有任务都结束。下面是并发执行任务的一个例子:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())
maiI() 函数也是一个协程,通过最后一行中的 run() 方法来运行,函数中先创建了一个队列,接着往队列中放进了20个0.05至1之间的随机数。
接着通过 create_task() 方法创建了3个任务,这三个任务都用于执行 worker 协程。这三个任务被添加到 tasks 数组中。
其中的 worker() 函数是一个协程,该函数从队列中获取一个要处理的数据,在处理完后,调用 queue 的 task_done() 函数来通知该数据已经处理完。这样 queue 中已完成的任务数会减一。
此时3个任务已经开始并发运行,接着调用 queue.join() 来等待队列中的所有数据被处理完,也就是所有元素数量的 task_done() 被调用。
当队列中的数据被处理完后,把3个任务都取消,但由于任务的 cancel() 函数被调用后不会马上被取消,而是要等到下一个消息循环,所以需要调用 gather() 函数等待所有任务结束。
3. 工具实现
为了实现可重用性,我们可以创建个 WebTitle 类来运行任务。
构造函数中的 urls 为需要并发获取标题的 url, coroutine_count 为协程数目。result 是个字典,通过键值的方式存储 url 和相应的标题。
class WebTitle:
def __init__(self, urls, coroutine_count=20):
self.urls = urls
self.coroutine_count = coroutine_count
self.result = {}
接着定义个 start() 方法来启动并发获取 url 标题的协程任务, 其中的
asyncio.run() 启动一个消息循环并开始运行 self.start_task() 方法。
def start(self):
asyncio.run(self.start_task())
start_task() 方法是一个协程, 先调用 self.init_queue() 方法来生成所有 url 的队列,然后根据指定的协程数来生成相应数目的task,每个task 都会运行 get_title() 函数 。接着调用 queue.join() 来等待队列中的所有 url 被处理完。url 被处理完后,把所有任务都取消,然后等待所有任务都取消完。
def init_queue(self):
queue = asyncio.Queue()
for url in self.urls:
queue.put_nowait(url)
return queue
async def start_task(self):
queue = self.init_queue()
tasks = []
for i in range(self.coroutine_count):
task = asyncio.create_task(self.get_title(queue))
tasks.append(task)
await queue.join()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
get_title() 函数是一个协程,在一个 while 循环中一直取 url 出来处理,接着使用 aiohttp 库来获取网页源码。aiohttp 为异步 http 库,通过 await 来等待网络请求完成并获取网页源码。
获取完源码后调用 get_title_from_html() 函数来获取网页的标题,最后把 url 和标题保存在 result 字典中。
最后调用 queue.task_done() 来通知该 url 已经处理完成,以便前面的 queue.join() 函数最后可以解除阻塞。
def get_title_from_html(self, html):
title = 'not content!'
title_patten = r'<title>(\s*?.*?\s*?)</title>'
result = re.findall(title_patten, html)
if len(result) >= 1:
title = result[0]
title = title.strip()
return title
async def get_title(self, queue):
while True:
url = await queue.get()
print('get title for {}'.format(url))
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=3, ssl=ssl.SSLContext()) as resp:
html = await resp.text()
title = self.get_title_from_html(html)
print('{}:{}'.format(url,title))
self.result[url] = title
except Exception as e:
print('{} has error: {} '.format(url,str(e)))
queue.task_done()
获取完网页标题后,把所有结果写进 csv 文件中。
def write_result(self, outfile):
with open(outfile, 'w') as f:
writer = csv.writer(f)
writer.writerow(['url','title'])
urls = self.result.keys()
for url in urls:
title = self.result[url]
writer.writerow([url, title])
print('result write to {}'.format(outfile))
到这里WebTitle 类就实现完成了,接下来写个 main() 函数来解析文件中的内容并生成 url 来给 webtitle 实例来获取标题。
def parse_args():
parser = argparse.ArgumentParser(description='A tool that can get title for domains or urls')
parser.add_argument('-d','--domain', metavar='domain.txt', dest='domain_file', type=str, help=u'domain to get title')
parser.add_argument('-u','--url', metavar='url.txt', dest='url_file', type=str, help=u'urls to get title')
parser.add_argument('-i','--ip', metavar='ip.txt', dest='ip_file', type=str, help=u'ips to get title')
parser.add_argument('-t','--coroutine', metavar='20', dest='coroutine_count', type=int, default=20,help=u'coroutines to get title')
parser.add_argument('-o','--outfile', metavar='result.txt', dest='outfile', type=str, default='result.csv',help=u'file to result')
args = parser.parse_args()
if args.url_file == None and args.domain_file == None and args.ip_file == None:
parser.print_help()
sys.exit()
return args
def main():
try:
args = parse_args()
urls = []
if args.domain_file:
with open(args.domain_file) as f:
domains = f.readlines()
for domain in domains:
domain = domain.strip()
if domain != '':
urls.append('http://' + domain)
urls.append('https://' + domain)
if args.url_file:
with open(args.url_file) as f:
urls2 = f.readlines()
for url in urls2:
url = url.strip()
if url != '':
urls.append(url)
if args.ip_file:
with open(args.ip_file) as f:
ips = f.readlines()
for ip in ips:
ip = ip.strip()
if ip != '':
cidr_ip = IPy.IP(ip)
for i in cidr_ip:
urls.append('http://' + str(i))
urls.append('https://' + str(i))
web_title = WebTitle(urls, args.coroutine_count)
web_title.start()
web_title.write_result(args.outfile)
except Exception as e:
print(e)
4. 工具用法
该工具仅在 python 3.7 测试,可以稳定使用, python 3.8 还不是稳定版本,3.8 的协程有有 bug ,建议在 3.5 - 3.7 中使用。
optional arguments:
-h, --help show this help message and exit
-d domain.txt, --domain domain.txt
domain to get title
-u url.txt, --url url.txt
urls to get title
-i ip.txt, --ip ip.txt
ips to get title
-t 20, --coroutine 20
coroutines to get title
-o result.txt, --outfile result.txt
file to result
# 指定要获取标题的域名列表文件
python3 web_title.py -d domain.txt
# 指定 url 文件,格式为 http://www.baidu.com
python3 web_title.py -u url.txt`在这里插入代码片`
# 指定 ip 文件,格式为 192.168.1.1 或 192.168.1.1/24
python3 web_title.py -i ip.txt
# 同时指定三种格式的文件
python3 web_title.py -i ip.txt -d domain.txt -u url.txt
# 指定协程数
python3 web_title.py -u url.txt -t 50
5. 工具使用截图
需要工具完整源码的,请在公众号后台回复 web_tilte 获取。
本文章也在我的公众号发布、
更多推荐
所有评论(0)