RabbitMQ 几种交换机的使用场景
一:默认交换机平均分配消息到custom, routing_key会将交换机的消息路由到指定的队列中消费者#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()chan
·
一:默认交换机
平均分配消息到custom, routing_key会将交换机的消息路由到指定的队列中
- 消费者
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume('hello', callback, False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
- 生产者
import pika import random # 新建连接 rabbitmq安装在本地则hostname为localhost hostname = 'localhost' parameters = pika.ConnectionParameters(hostname) connection = pika.BlockingConnection(parameters) # 创建通道 channel = connection.channel() # 声明一个队列 生产者和消费者都要声明一个相同的队列 用来防止万一某一方挂了 另一方还能正常运行 channel.queue_declare(queue='hello') number = random.randint(1, 100) body = f'Hello World:{number}' # 交换机; 队列名 写明将消息发往哪个队列 消息内容 channel.basic_publish(exchange='', routing_key='hello', body=body) print(f"[x] Sent {body}") connection.close()
二: 公平调度即当前消费者正在处理消息 管道(channel)将队列的消息指派给其他消费者
- 消费者
#!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print (' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body): print (" [x] Received %r" % (body,)) time.sleep( body.decode('utf-8').count('.') ) print (" [x] Done") # 消息重新入队列 当这个消费者挂掉之后 消息重新入队列 发送给其他消费者 ch.basic_ack(delivery_tag = method.delivery_tag) # 公平调度 channel.basic_qos(prefetch_count=1) channel.basic_consume('task_queue',callback) channel.start_consuming()
- 生产者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 队列持久化 channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" # delivery_mode 消息持久化 channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print (" [x] Sent %r" % (message,)) connection.close()
三:扇形交互机
将消息发送到它知道的所有的队列(忽略routing_key路由键的值),将消息发送到与交互机绑定的队列中
- 消费者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') # 当与消费者(consumer)断开连接的时候,这个队列应当被立即删除 result = channel.queue_declare(queue='' ,exclusive=True) queue_name = result.method.queue #n 队列绑定交换机 channel.queue_bind(exchange='logs', queue=queue_name) print('[*] Waiting for logs To exit press CTRL+C') def callback(ch, method, properties, body): print('[x] %r'%(body,)) channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming()
- 生产者
import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() # 定义交换机: 扇形交换机 channel.exchange_declare(exchange='logs', exchange_type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print (" [x] Sent %r" % (message,)) connection.close()
四:直连交换机
直连交互机可以指绑定一个队列 也可以通过routing_key绑定多个队列
比如:日志记录 error日志写入磁盘 而error, warning, info 时,都打印在输出台
- 消费者
#!/usr/bin/env python import pika, sys, os def main(): connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue # 获取运行py文件额外的参数 severities = sys.argv[1:] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body.decode())) channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming() if __name__ == '__main__': try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0)
- 生产者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish( exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:
$ python receive_logs_direct.py warning error > logs_from_rabbit.log
如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:
$ python receive_logs_direct.py info warning error
[*] Waiting for logs. To exit press CTRL+C
如果要触发一个error级别的日志,只需要输入:
$ python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
更多推荐
所有评论(0)