RabbitMQ 基于python的使用方法
RabbitMQ基于的原理之前已经讲过,每个组成部分也都有说明, 现在就开始学习如何使用了。从最基本的模型说起。P发送一个hello的消息给C。在这个事例中,并不需要考虑太多rabbitMQ部件上的设置。对于发送方P来说,需要做的就是定义好一个队列,使用默认的exchange。#!/usr/bin/env pythonimport pikaconnecti
RabbitMQ基于的原理之前已经讲过,每个组成部分也都有说明, 现在就开始学习如何使用了。
从最基本的模型说起。
P发送一个hello的消息给C。在这个事例中,并不需要考虑太多rabbitMQ部件上的设置。
对于发送方P来说,需要做的就是定义好一个队列,使用默认的exchange。
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
这程序是rabbitMQ的官方示例,跑起来毫无压力。
回过头看上面的例子,对于P来说,它定义了一个队列并把默认的exchange的binging规则中添加了一条朝向这个队列的routing key。
而对于C来说,并不关心exchange的消息,它只需要注册基于队列'hello'的回调函数即可。
第二个示例是通过RabbitMQ构建一个工作队列。工作队列的特点是多个Consumer获取同一个队列中的任务,各自完成后再继续下一个。
P 端:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print " [x] Sent %r" % (message,)
connection.close()
从代码中看,变化并不大,在声明队列的使用用到了参数durable,这个参数的作用是将消息声明为持久化的,在被确认已经接收之后才删除,否则重启Consumer后还是会再次接收到消息。
C端:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
C端的变化则更为显著。首先声明了队列,保持与P端一致。
二是在basic_consume中去掉了auto_ack的负值,这时就需要C端手动调用回执函数basic_ack。如果没有调用这个函数,则消息即时处理过后依然会出现在下次的队列中。
下面是一个例子:
P方发送以下命令到消息队列
Hello World! 0
Hello World! 1
Hello World! 2
Hello World! 3
Hello World! 4
Hello World! 5
Hello World! 6
Hello World! 7
这时如果只运行一个C,在callback函数中打印得到的消息
CALLBACK: Hello World! 0
CALLBACK: Hello World! 1
CALLBACK: Hello World! 2
CALLBACK: Hello World! 3
CALLBACK: Hello World! 4
CALLBACK: Hello World! 5
CALLBACK: Hello World! 6
CALLBACK: Hello World! 7
关闭C后重新启动一个C,会发现上述消息又被重新接收了一遍,而且还是之前的顺序!
CALLBACK: Hello World! 0
CALLBACK: Hello World! 1
CALLBACK: Hello World! 2
CALLBACK: Hello World! 3
CALLBACK: Hello World! 4
CALLBACK: Hello World! 5
CALLBACK: Hello World! 6
CALLBACK: Hello World! 7
这说明,即使每条消息已经被某个C的Callback函数一次处理,它们还是按照原有的顺序保存在队列中。
第三处重要的改动是添加了下面的一行代码
channel.basic_qos(prefetch_count=1)
这行代码的作用是来指定每个C每次只从消息队列中挑选1个消息。
这就涉及到RabbitMQ中分发消息到多个C的一种策略: Fair dispatch。
这个策略规定了在消息分发时并不是看当前哪个C是空闲的,而是将当前所有的消息依次分给所有可用的C,如果某个C正在忙碌,它对应的消息也会停在队列中,不会被其他C抢走。这就意味着,如果消息队列中已经存在了很多消息,而当前的每个C都是忙碌的,这时就算添加新的C也是没有用的。添加新的C只会分配后再后来新加的消息,对于现存的消息是无法承接的。
回到刚才的例子,P依旧发出同样的消息,而设定2个C开启并接收。得到的结果如下:
C1:
CALLBACK: Hello World! 0
CALLBACK: Hello World! 2
CALLBACK: Hello World! 4
CALLBACK: Hello World! 6
C2:
CALLBACK: Hello World! 1
CALLBACK: Hello World! 3
CALLBACK: Hello World! 5
CALLBACK: Hello World! 7
即使我们设定每个C1中回调函数运行的时间比C2长,长很多,也还是会出现同样的结果。
只有在RabbitMQ侦测到某个C的连接中断(crash之类)之后,才会把原本分配给这个C的消息重新分配给其他的C。
在实际应用中,一般大家都不希望出现这种等待,所以采用basic_qos来声明每次每个C只获取一个消息,其它的消息等下一个C空闲时再分配。
这也就是官方文档上图的意思。
更多推荐
所有评论(0)