一:默认交换机

平均分配消息到custom, routing_key会将交换机的消息路由到指定的队列中

  • 消费者
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
    
    channel.basic_consume('hello', callback, False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
  • 生产者 
    import pika
    import random
    
    # 新建连接 rabbitmq安装在本地则hostname为localhost
    hostname = 'localhost'
    parameters = pika.ConnectionParameters(hostname)
    connection = pika.BlockingConnection(parameters)
    
    # 创建通道
    channel = connection.channel()
    # 声明一个队列 生产者和消费者都要声明一个相同的队列 用来防止万一某一方挂了 另一方还能正常运行
    channel.queue_declare(queue='hello')
    
    number = random.randint(1, 100)
    body = f'Hello World:{number}'
    
    # 交换机; 队列名  写明将消息发往哪个队列 消息内容
    channel.basic_publish(exchange='', routing_key='hello', body=body)
    print(f"[x] Sent {body}")
    connection.close()


二: 公平调度即当前消费者正在处理消息 管道(channel)将队列的消息指派给其他消费者

  • 消费者
    #!/usr/bin/env python
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='task_queue', durable=True)
    print (' [*] Waiting for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print (" [x] Received %r" % (body,))
        time.sleep( body.decode('utf-8').count('.') )
        print (" [x] Done")
        # 消息重新入队列 当这个消费者挂掉之后 消息重新入队列  发送给其他消费者
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    # 公平调度
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume('task_queue',callback)
    
    channel.start_consuming()
  • 生产者
    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    # 队列持久化
    channel.queue_declare(queue='task_queue', durable=True)
    
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    # delivery_mode 消息持久化
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                             delivery_mode = 2, # make message persistent
                          ))
    print (" [x] Sent %r" % (message,))
    connection.close()

三:扇形交互机

将消息发送到它知道的所有的队列(忽略routing_key路由键的值),将消息发送到与交互机绑定的队列中

  • 消费者
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    # 当与消费者(consumer)断开连接的时候,这个队列应当被立即删除
    result = channel.queue_declare(queue='' ,exclusive=True)
    queue_name = result.method.queue
    #n 队列绑定交换机
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print('[*] Waiting for logs To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print('[x] %r'%(body,))
        
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    channel.start_consuming()
  • 生产者
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    # 定义交换机: 扇形交换机
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    
    print (" [x] Sent %r" % (message,))
    connection.close()

四:直连交换机

直连交互机可以指绑定一个队列 也可以通过routing_key绑定多个队列

比如:日志记录 error日志写入磁盘 而error, warning, info 时,都打印在输出台

  • 消费者
    #!/usr/bin/env python
    import pika, sys, os
    
    def main():
        connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
    
        channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
    
        result = channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        # 获取运行py文件额外的参数
        severities = sys.argv[1:]
        if not severities:
            sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
            sys.exit(1)
    
        for severity in severities:
            channel.queue_bind(
                exchange='direct_logs', queue=queue_name, routing_key=severity)
    
        print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
        def callback(ch, method, properties, body):
            print(" [x] %r:%r" % (method.routing_key, body.decode()))
    
    
        channel.basic_consume(
            queue=queue_name, on_message_callback=callback, auto_ack=True)
    
        channel.start_consuming()
    
    
    if __name__ == '__main__':
        try:
            main()
        except KeyboardInterrupt:
            print('Interrupted')
            try:
                sys.exit(0)
            except SystemExit:
                os._exit(0)
  • 生产者
    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(
        exchange='direct_logs', routing_key=severity, body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()

如果你希望只是保存warning和error级别的日志到磁盘,只需要打开控制台并输入:

$ python receive_logs_direct.py warning error > logs_from_rabbit.log

如果你希望所有的日志信息都输出到屏幕中,打开一个新的终端,然后输入:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

如果要触发一个error级别的日志,只需要输入:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐