平时做渗透的时候,有时候给的是一些域名、一些 url 、一些 ip 或者三者都有,手动去一个个地打开比较浪费时间。我们需要用最短时间发现一些有趣的目标,如 xx 管理后台。于是让我们用 python 的协程来写个并发获取网站标题的工具吧,还可以顺便学习下协程的使用。
——人生苦短,我用python

在这里插入图片描述

1. 需求分析

先对工具做个需求分析:
可以并发获取标题,并且可以根据网络速度设置协程数目。
可以读取指定文件中的 url 、域名和 ip 来获取标题。
对于 ip 列表,需要支持 CIDR 格式的 ip 地址段,也就是可以解析如 192.168.1.0/24 这样的 C 段地址来获取标题。
可以把存在标题的网站输出到文件中,也就是80和443端口存在 web 应用的 url 和标题输出到指定的文件中。
程序具有复用性,也就是可以很方便地集成到以后开发的工具中。

2. 原理介绍

在开始开发前,先来解释下什么是协程,它和线程有什么区别。

协程,又称微线程,纤程,英文名Coroutine。协程在执行函数A时,可以随时中断,去执行函数B,接着继续执行函数A。但这一过程并不是函数调用,有点类似CPU的中断。这一整个过程看起来有点像多线程。

比如子程序A、B:

def A():
  print '1'
  print '2'
  print '3'def B():
  print 'x'
  print 'y'
  print 'z'

假设由协程执行,在执行A的过程中,可以随时中断,去执行B,B也可能在执行过程中中断再去执行A,结果可能是:

1
2
x
y
3
z

看起来A、B的执行有点像多线程,但协程的特点在于是一个线程执行,那和多线程比,协程有什么优势呢?

协程具有极高的执行效率。因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显。
因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。

协程主要用来处理异步 IO,异步 IO 是指非阻塞的资源读取。如在发起一个网络请求时,由于需要下载完数据才能完成读取,这需要一段时间,通常这个时候会一直处于阻塞状态,如果在UI线程中执行这种阻塞的操作,还会使程序卡死。

而异步 IO 中存在一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程: 遇到IO操作时,代码只负责发出IO请求,不等待IO结果,然后直接结束本轮消息处理,进入下一轮消息处理过程。当IO操作完成后,将收到一条“IO完成”的消息,处理该消息时就可以直接获取IO操作结果。

简单来说,可以把异步IO理解成一个圆形循环的工厂流水线,流水线上有一个工人,当工人拿到一个零件后,开始处理,接着遇到了个阻塞的操作,无法继续处理了,于是工人发出一个IO请求,然后把零件放回流水线上,去处理下一个零件。在之前阻塞的零件转回到工人面前时,该零件已经完成了IO请求,于是工人继续处理零件剩下的步骤。

asyncio 是用来编写并发代码的库,使用 async/await 语法, 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。asyncio 往往是构建 IO 密集型和高层级结构化网络代码的最佳选择。

在 python 3.5 以后,可以通过 async 关键字来定义一个函数为协程。然后通过 await 关键词来等待一个可等待对象,这个对象一般为协程。
如下代码所示, asyncio.sleep() 是一个可等待对象。

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

定义协程后,可以通过 asyncio.run() 运行传入的协程,此函数还负责管理 asyncio 事件循环并 完结异步生成器。

async def main():
  await asyncio.sleep(1)
  print('hello')
​
asyncio.run(main())

为了完成并发操作,我们可以创建多个任务来并发执行协程,如果需要同时运行20个协程,则可以通过创建20个任务来运行协程。

asyncio.create_task() 函数将协程 打包为一个 Task 排入日程准备执行。返回 Task 对象。当一个协程通过 asyncio.create_task() 等函数被打包为一个任务,该协程将自动排入日程准备立即运行:

import asyncio
​
async def nested():
  return 42async def main():
  # Schedule nested() to run soon concurrently
  # with "main()".
  task = asyncio.create_task(nested())# "task" can now be used to cancel "nested()", or
  # can simply be awaited to wait until it is complete:
  await task
​
asyncio.run(main())

注意,create_task() 函数在 Python 3.7 中被加入。在 Python 3.7 之前,可以改用低层级的 asyncio.ensure_future() 函数,但可读性不高。

要并发运行多个任务,可以使用
asyncio.gather(*aws, loop=None, return_exceptions=False) 方法,
该方法可并发运行 aws 序列中的可等待对象,直到所有任务都结束。下面是并发执行任务的一个例子:

import asyncio
import random
import time
​
​
async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()# Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)# Notify the queue that the "work item" has been processed.
        queue.task_done()print(f'{name} has slept for {sleep_for:.2f} seconds')
​
​
async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()# Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)# Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)# Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at
​
    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
​
​
asyncio.run(main())

maiI() 函数也是一个协程,通过最后一行中的 run() 方法来运行,函数中先创建了一个队列,接着往队列中放进了20个0.05至1之间的随机数。

接着通过 create_task() 方法创建了3个任务,这三个任务都用于执行 worker 协程。这三个任务被添加到 tasks 数组中。

其中的 worker() 函数是一个协程,该函数从队列中获取一个要处理的数据,在处理完后,调用 queue 的 task_done() 函数来通知该数据已经处理完。这样 queue 中已完成的任务数会减一。

此时3个任务已经开始并发运行,接着调用 queue.join() 来等待队列中的所有数据被处理完,也就是所有元素数量的 task_done() 被调用。

当队列中的数据被处理完后,把3个任务都取消,但由于任务的 cancel() 函数被调用后不会马上被取消,而是要等到下一个消息循环,所以需要调用 gather() 函数等待所有任务结束。

3. 工具实现

为了实现可重用性,我们可以创建个 WebTitle 类来运行任务。

构造函数中的 urls 为需要并发获取标题的 url, coroutine_count 为协程数目。result 是个字典,通过键值的方式存储 url 和相应的标题。

class WebTitle:
    def __init__(self, urls, coroutine_count=20):
        self.urls = urls
        self.coroutine_count = coroutine_count
        self.result = {}

接着定义个 start() 方法来启动并发获取 url 标题的协程任务, 其中的
asyncio.run() 启动一个消息循环并开始运行 self.start_task() 方法。

def start(self):
        asyncio.run(self.start_task())

start_task() 方法是一个协程, 先调用 self.init_queue() 方法来生成所有 url 的队列,然后根据指定的协程数来生成相应数目的task,每个task 都会运行 get_title() 函数 。接着调用 queue.join() 来等待队列中的所有 url 被处理完。url 被处理完后,把所有任务都取消,然后等待所有任务都取消完。

def init_queue(self):
        queue = asyncio.Queue()
        for url in self.urls:
            queue.put_nowait(url)
        return queue
    
async def start_task(self):
        queue = self.init_queue()
        tasks = []
        for i in range(self.coroutine_count):
            task = asyncio.create_task(self.get_title(queue))
            tasks.append(task)await queue.join()for task in tasks:
            task.cancel()await asyncio.gather(*tasks, return_exceptions=True)

get_title() 函数是一个协程,在一个 while 循环中一直取 url 出来处理,接着使用 aiohttp 库来获取网页源码。aiohttp 为异步 http 库,通过 await 来等待网络请求完成并获取网页源码。

获取完源码后调用 get_title_from_html() 函数来获取网页的标题,最后把 url 和标题保存在 result 字典中。

最后调用 queue.task_done() 来通知该 url 已经处理完成,以便前面的 queue.join() 函数最后可以解除阻塞。

 def get_title_from_html(self, html):
        title = 'not content!'
        title_patten = r'<title>(\s*?.*?\s*?)</title>'
        result = re.findall(title_patten, html)
        if len(result) >= 1:
            title = result[0]
            title = title.strip()
        return title
​
    async def get_title(self, queue):
        while True:
            url = await queue.get()
            print('get title for {}'.format(url))
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(url, timeout=3, ssl=ssl.SSLContext()) as resp:
                        html = await resp.text()
                title = self.get_title_from_html(html)
                print('{}:{}'.format(url,title))
                self.result[url] = title
            except Exception as e:
                print('{} has error: {} '.format(url,str(e)))                
            queue.task_done()

获取完网页标题后,把所有结果写进 csv 文件中。

def write_result(self, outfile):
        with open(outfile, 'w') as f:
            writer = csv.writer(f)
            writer.writerow(['url','title'])
            urls = self.result.keys()
            for url in urls:
                title = self.result[url]
                writer.writerow([url, title])
        print('result write to {}'.format(outfile))

到这里WebTitle 类就实现完成了,接下来写个 main() 函数来解析文件中的内容并生成 url 来给 webtitle 实例来获取标题。

def parse_args():
    parser = argparse.ArgumentParser(description='A tool that can get title for domains or urls')
    parser.add_argument('-d','--domain', metavar='domain.txt', dest='domain_file', type=str, help=u'domain to get title')
    parser.add_argument('-u','--url', metavar='url.txt', dest='url_file', type=str, help=u'urls to get title')
    parser.add_argument('-i','--ip', metavar='ip.txt', dest='ip_file', type=str, help=u'ips to get title')
    parser.add_argument('-t','--coroutine', metavar='20', dest='coroutine_count', type=int, default=20,help=u'coroutines to get title')
    parser.add_argument('-o','--outfile', metavar='result.txt', dest='outfile', type=str, default='result.csv',help=u'file to result')
    args = parser.parse_args()
    if args.url_file == None and args.domain_file == None and args.ip_file == None:
        parser.print_help()
        sys.exit()
    return args
​
​
def main():
    try:
        args = parse_args()
        urls = []if args.domain_file:
            with open(args.domain_file) as f:
                domains = f.readlines()
            for domain in domains:
                domain = domain.strip()
                if domain != '':
                    urls.append('http://' + domain)
                    urls.append('https://' + domain)if args.url_file:
            with open(args.url_file) as f:
                urls2 = f.readlines()
            for url in urls2:
                url = url.strip()
                if url != '':
                    urls.append(url)if args.ip_file:
            with open(args.ip_file) as f:
                ips = f.readlines()
            for ip in ips:
                ip = ip.strip()
                if ip != '':
                    cidr_ip = IPy.IP(ip)
                    for i in cidr_ip:
                        urls.append('http://' + str(i))
                        urls.append('https://' + str(i))
​
        web_title = WebTitle(urls, args.coroutine_count)
        web_title.start()
        web_title.write_result(args.outfile)
    except Exception as e:
        print(e)

4. 工具用法

该工具仅在 python 3.7 测试,可以稳定使用, python 3.8 还不是稳定版本,3.8 的协程有有 bug ,建议在 3.5 - 3.7 中使用。

optional arguments:
-h, --help            show this help message and exit
-d domain.txt, --domain domain.txt
domain to get title
-u url.txt, --url url.txt
urls to get title
-i ip.txt, --ip ip.txt
ips to get title
-t 20, --coroutine 20
coroutines to get title
-o result.txt, --outfile result.txt
file to result
​
# 指定要获取标题的域名列表文件
python3 web_title.py -d domain.txt
# 指定 url 文件,格式为 http://www.baidu.com
python3 web_title.py -u url.txt`在这里插入代码片`
# 指定 ip 文件,格式为 192.168.1.1 或 192.168.1.1/24
python3 web_title.py -i ip.txt
# 同时指定三种格式的文件
python3 web_title.py -i ip.txt -d domain.txt -u url.txt
# 指定协程数
python3 web_title.py -u url.txt -t 50

5. 工具使用截图

在这里插入图片描述

需要工具完整源码的,请在公众号后台回复 web_tilte 获取。

本文章也在我的公众号发布、
我的安全专家之路

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐