Rocketmq操作方式-python
#! /usr/bin/python#encoding:utf-8from rocketmq.client import Producer, Messageimport jsonimport sysimport timereload(sys)sys.setdefaultencoding( "utf-8" )'''rocketmq写入消息'''def send():producer = Produc
·
#! /usr/bin/python
#encoding:utf-8
from rocketmq.client import Producer, Message
import json
import sys
import time
reload(sys)
sys.setdefaultencoding( "utf-8" )
'''
rocketmq写入消息
'''
def send():
producer = Producer('test')
producer.set_namesrv_addr('127.0.0.1:9876') #rocketmq队列接口地址(服务器ip:port)
producer.start()
msg_body = {"id":"001","name":"test_mq","message":"abcdefg"}
ss = json.dumps(msg_body).encode('utf-8')
msg = Message('test') #topic名称
msg.set_keys('ce')
msg.set_tags('ce')
msg.set_body(ss) #message body
retmq = producer.send_sync(msg)
print(retmq.status, retmq.msg_id, retmq.offset)
producer.shutdown()
if __name__ =="__main__":
print "开始时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
for i in range(1000):
send()
print "结束时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
#消费方式PullConsumer(全部消费)(可重复消费)
from rocketmq.client import PullConsumer
import json
consumer = PullConsumer('PID-001')
consumer.set_namesrv_addr('ip:port')
consumer.start()
for msg in consumer.pull('test'):
print(msg.id, msg.body)
consumer.shutdown()
#! /usr/bin/python
#encoding:utf-8
from rocketmq.client import Producer, Message
from rocketmq.client import PullConsumer,PushConsumer
import json
import sys
import time
reload(sys)
sys.setdefaultencoding( "utf-8" )
#消费方式PushConsumer不可重复消费
def callback(msg):
print(msg)
print "开始时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
consumer = PushConsumer('test')
consumer.set_namesrv_addr('127.0.0.1:9876')
consumer.subscribe("test",callback)
consumer.start()
while True:
time.sleep(30)
consumer.shutdown()
print "结束时间:" + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
更多推荐
已为社区贡献3条内容
所有评论(0)