fastapi-后台任务、定时任务与消息队列
文章目录概述后台任务消息队列与定时任务在fastapi中初始化rearq创建定时任务使用任务引入日志概述有时候我们不可避免的要创建一些异步任务,这些任务与主线程无关或比较耗时不必立即反馈结果或不需要反馈结果。后台任务后台任务为fastapi自带的一种独立于主线程的执行方式,后台任务与请求回调很相似(甚至我觉得就是一样的,请求回调的方式请参考官方文档)。示例:from fastapi import
·
概述
有时候我们不可避免的要创建一些异步任务,这些任务与主线程无关或比较耗时不必立即反馈结果或不需要反馈结果。
后台任务
后台任务为fastapi自带的一种独立于主线程的执行方式,后台任务与请求回调很相似(甚至我觉得就是一样的,请求回调的方式请参考官方文档)。
示例:
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):# 提前定义好任务
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):# 请求一个后台任务的方法,
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
后台任务对应的是比较简单的一些情况。
消息队列与定时任务
这里我们使用到的是rearq,这个包是根据arq进行的二次开发,在api上面参考了celery,以做到更人性化和简单化。参考页面
在fastapi中初始化rearq
配置settings
# settings.py
#在settings文件夹里面配置rearq需要的参数,rearq使用redis
REARQ = {
"redis_host": REDIS_HOST,
"redis_port": REDIS_PORT,
"redis_password": REDIS_PASSWORD,
"redis_db": 1,
}
初始化
# 在项目的根目录__init__初始化rearq
from rearq import ReArq
from demo import settings# 上文的settings.py
rearq = ReArq(**settings.REARQ,)
启动startup和shutdown
# factory.py
#该文件主要是生成app和相关配置的,然后在main里面引入本文件,然后创建项目
from demo import rearq, settings
#在startup里面启动需要使用的相关组件(在定时任务里面使用了redis缓存和tortoise-orm异步orm,所以要提前启动)
@rearq.on_startup
async def on_startup():
await AsyncRedisUtil.init(**settings.REDIS)# redis异步操作的组件
await Tortoise.init(config=settings.TORTOISE_ORM)# tortoise-orm异步操作的组件
@rearq.on_shutdown
async def on_shutdown():
await AsyncRedisUtil.close()
await Tortoise.close_connections()
async def create_app():
...
创建定时任务
#tasks.py
from enum import Enum
class Queue(str, Enum): # 使用到的队列
task = "task"
withdraw = "withdraw"
notify = "notify"
pass_task = "pass_task"
@rearq.task(queue=Queue.withdraw.value, cron="*/5 * * * *")#每五分钟执行一次,使用的队列是withdraw
async def timing_withdraw(self):
"""
定时任务,self必须填,其他的为该方法需要的参数
:param self:
:return:
"""
...
return ...#会写入日志...吧?
@rearq.task(queue=Queue.task.value,)
async def check_receive_task(
self, uaid, task_id, uid, platform_category: db.TaskPlatformCategory, category: db.TaskCategory
):# 除了self,其他的参数都是在使用这个队列任务的时候需要填充的
...
return "未找到"
使用任务
在创建完任务之后,使用任务的方式如下:
#router.py
#接口
@router.get("/test_queue")
async def test_queue():
check_receive_task.delay(...)#这里填写需要的参数,这样就会放到消息队列里面等待执行
return {"msg":"OK"}
引入日志
rearq.log.init_logging(verbose)[source]
启动rearq
项目一般使用的是docker-compose进行管理,一般的项目结构大概如下
version: '3'
services:
demo:#主服务
build: .
env_file:
- .env
ports:
- '8008:8000'
depends_on:
- redis
image: demo
command: uvicorn demo.main:app --port 8000 --host 0.0.0.0
timing-worker:#启动定时任务服务
env_file:
- .env
image: demo
depends_on:
- demo
command: rearq worker demo.factory:rearq --timer
task-worker:#启动任务服务,每个queue单独启动
env_file:
- .env
image: demo
depends_on:
- demo
command: rearq worker demo.factory:rearq -q task
withdraw-worker:#启动任务服务,每个queue单独启动
env_file:
- .env
image: demo
depends_on:
- demo
command: rearq worker demo.factory:rearq -q withdraw
...
redis:#启动redis
hostname: redis
image: redis
volumes:
- ./redis/data:/data
非docker-compose的启动方式:
command: rearq worker demo.factory:rearq --timer
注意:定时任务要单独启动一个worker,每个queue也要启动一个worker,timer的工作是到时间把任务推到对应的queue里面去执行,所以必须单独启动一个。
总结
更具体的任务使用方式查看参考页面
也可以参考arq和celery的相关方法,有什么需求也可以到github的项目下留言
更多推荐
已为社区贡献5条内容
所有评论(0)