python实现redis5.0.8 流操作
#!/usr/bin/python# -*- coding: UTF-8 -*"""@file_name: stream.py@author:li-boss@create date: 2020/07/28 15:40@description:redis流操作"""import uuidclass Stream():"""redis流"""def __init__(self,stream_name=
·
作者:lizhonglin
github: https://github.com/Leezhonglin/
blog: https://leezhonglin.github.io
这段时间需要用到消息队列,查询了python有通过Celery来实现,但是遇到分布式的时候连接redis集群时.使用Celery会出现不能连接的问题.找了很多redis连接库,多多少少有各种问题. 最后了解到redis的流.完美的解决这个问题.一下是基本的实现.
#!/usr/bin/python
# -*- coding: UTF-8 -*
"""
@file_name: stream.py
@author:li-boss
@create date: 2020/07/28 15:40
@description:redis流操作
"""
import uuid
class Stream():
"""
redis流
"""
def __init__(self,stream_name='TASK_QUEUE',group_name = 'QUEUE'):
self.db = redis_client.get_conn()
self.stream = stream_name
self.add_stream(self.stream)
if not self.is_active_group(self.stream,group_name):
self.db.xgroup_create(self.stream, group_name, id=0)
def is_active_group(self,stream_name,group_name):
"""
验证是否有组
:param stream_name:
:param group_name:
:return:
"""
infos = self.db.xinfo_groups(stream_name)
for item in infos:
if item.get('name').decode()==group_name:
return True
return False
def add_stream(self,stream_name):
"""
添加流
:param stream_name:
:return:
"""
if not self.db.exists(stream_name):
id = self.db.xadd(stream_name, {'test': 'test'})
self.db.xdel(stream_name, id)
return
def add_msg(self,msg):
"""
添加消息
:param msg:
:return:
"""
return self.db.xadd(self.stream,msg)
def del_msg(self,msg_id):
"""
删除消息
:param msg_id:
:return:
"""
return self.db.xdel(self.stream,msg_id)
def get_stream_len(self):
"""
获取流的长度
:return:
"""
return self.db.xlen(self.stream)
def get_stream_list(self):
return self.db.xrange(self.stream)
class streamClient():
"""
流客户端
"""
def __init__(self,stream_name='TASK_QUEUE',group_name = 'QUEUE'):
self.db = redis_client.get_conn()
self.stream_name = stream_name
self.group = group_name
self.consumer_name ='user'+ str(uuid.uuid1())
def read_group(self, block=0):
"""
消费
:param block: block=0表示阻塞1,表示非阻塞
:return:
"""
return self.r.xreadgroup(self.group_name, self.customer, {self.stream_name: ">"}, count=1, block=block)
以上是自己总结的redis流操作,希望能帮助到大家.
更多推荐
已为社区贡献2条内容
所有评论(0)