【废弃】python 异步文件读写 + 异步爬虫
python 如果要使用协程写异步程序,尽量别调用阻塞式API遗憾的是python内置的文件操作和requests库的API都是阻塞式的我写了一个真正的异步爬虫网络部分使用第三方库 aiohttp文件操作部分是是我自己想的,实现思路是:把文件操作部分放到另一个线程中,通过信号量通信。不理解可以看操作系统相关教材。asyncFile.pyimport asyncioimport io...
·
python 如果要使用协程写异步程序,尽量别调用阻塞式API
遗憾的是python内置的文件操作和requests库的API都是阻塞式的
我写了一个真正的异步爬虫
网络部分使用第三方库 aiohttp
文件操作部分是是我自己想的,实现思路是:把文件操作部分放到另一个线程中,通过信号量通信。不理解可以看操作系统相关教材。
(2019/6/7)
似乎这个asyncFile的实现有问题,每次读写操作都会新建一个 ThreadPoolExecutor
所以不可靠
难道是 .run_in_executor
的问题?
asyncFile.py
import asyncio
import io
class AsyncFile:
class _ReadContent:
'''缓存读取的数据
read 在子线程中进行
用 _ReadContent().content 存储返回值
'''
def __init__(self,content=None):
self.content=content
def __init__(self,path:str,open_flag:str="r",executor=None):
# 路径
self.path=path
# 文件打开标记
self.open_flag=open_flag
# 文件
self._f=open(path,open_flag)
# 当前 event_loop
self._loop=asyncio.get_event_loop()
# 读写锁,同一时间最多只能有1个读者或者写者
self._rw_lock=asyncio.Lock()
# concurrent.futures 的 ThreadPoolExecutor 或者 ProcessPoolExecutor
# 不过我不确定用 ProcessPoolExecutor 有没有用
# 默认值为None,之后使用的就是loop的默认executor
self._executor=executor
def _read(self,r_content:_ReadContent,over_semaphore:asyncio.Semaphore):
# 读操作(阻塞)
r_content.content=self._f.read()
# 让父协程从等待队列中唤醒
over_semaphore.release()
def _write(self,content,over_semaphore:asyncio.Semaphore):
# 写操作(阻塞)
self._f.write(content)
# 让父协程从等待队列中唤醒
over_semaphore.release()
async def read(self):
if not self._f.readable():
raise io.UnsupportedOperation()
async with self._rw_lock:
# ===============================================
# over_semaphore 信号量表示了操作是否结束
over_semaphore=asyncio.Semaphore(0)
_read_content=self._ReadContent()
self._loop.run_in_executor(self._executor\
,self._read,_read_content,over_semaphore)
# over_semaphore<=0 时阻塞,被子线程release后才能继续进行
await over_semaphore.acquire()
# ===============================================
return _read_content.content
async def write(self,content):
if not self._f.writable():
raise io.UnsupportedOperation()
async with self._rw_lock:
# ===============================================
# 原理同读方法
over_semaphore=asyncio.Semaphore(0)
self._loop.run_in_executor(self._executor\
,self._write,content,over_semaphore)
await over_semaphore.acquire()
# ===============================================
async def seek(self,offset,where=0):
async with self._rw_lock:
self._f.seek(offset,where)
async def close(self):
async with self._rw_lock:
self._f.close()
def __enter__(self):
return self
def __exit__(self,exc_type,exc_val,traceback):
try:
self._f.close()
finally:
pass
asyncCrawler.py
from asyncFile import AsyncFile
import asyncio
import aiohttp
import time
async def get(session,url,timeout=60):
async with session.request('GET',url,timeout=timeout) as resp:
return await resp.read()
async def crawl(url,save_path,executor=None):
async with aiohttp.ClientSession() as session:
content=await get(session,url)
if content:
with AsyncFile(save_path,"wb",executor) as f:
await f.write(content)
if __name__=="__main__":
import os
from concurrent.futures import ThreadPoolExecutor
d="./_test_imgs/"
if not os.path.exists(d):
os.makedirs(d)
url="https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1551437241785&di=a827c7962549b54e2d4a84327902bf54&imgtype=0&src=http%3A%2F%2Fwww.baijingapp.com%2Fuploads%2Fcompany%2F03%2F36361%2F20170413%2F1492072091_pic_real.jpg"
save_path=os.path.join(d,"tmp{}.jpg")
executor=ThreadPoolExecutor(max_workers=8)
tasks=[]
for i in range(20):
tasks.append(crawl(url,save_path.format(i),executor))
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print("over")
更多推荐
已为社区贡献1条内容
所有评论(0)