图示
其中P指producer,即生产者;C指consumer,即消费者。中间的红色表示消息队列,实例中表现为HELLO队列。
往队列里插入数据前,查看消息队列
$sudo rabbitmqctl list_queues
Listing queues ...
celeryev.db53a5e0-1e6a-4f06-a9f7-2c104c4612fb
...done.
插入消息队列代码
#in_queue.py
#coding=utf8
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel() #声明队列,如果消息发送到不存在的队列,rabbitmq会自动清除这些消息
channel.queue_declare(queue='HELLO') for i in range(10):
#exchange表示交换器,可以精确的制定消息应发到哪个队列,route_key设置队列的名称,body表示发送的内容
channel.basic_publish(exchange='', routing_key='HELLO', body='Hello World!' + str(i))
print " [%d] Sent 'Hello World!'" % i
#关闭连接
connection.close()
执行结果
$python in_queue.py
[0] Sent 'Hello World!'
[1] Sent 'Hello World!'
[2] Sent 'Hello World!'
[3] Sent 'Hello World!'
[4] Sent 'Hello World!'
[5] Sent 'Hello World!'
[6] Sent 'Hello World!'
[7] Sent 'Hello World!'
[8] Sent 'Hello World!'
[9] Sent 'Hello World!'
此时查看消息队列
$sudo rabbitmqctl list_queues
Listing queues ...
HELLO 10
celeryev.db53a5e0-1e6a-4f06-a9f7-2c104c4612fb 0
...done.
可以看到队列HELLO里面有10条数据。
读取消息队列数据
#out_queue.py
#coding=utf8
import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel() channel.queue_declare(queue='HELLO') def callback(ch, method, properties, body):
print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='HELLO', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
执行结果
$python out_queue.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!0'
[x] Received 'Hello World!1'
[x] Received 'Hello World!2'
[x] Received 'Hello World!3'
[x] Received 'Hello World!4'
[x] Received 'Hello World!5'
[x] Received 'Hello World!6'
[x] Received 'Hello World!7'
[x] Received 'Hello World!8'
[x] Received 'Hello World!9'
此时查看消息队列
$sudo rabbitmqctl list_queues
Listing queues ...
HELLO 0
celeryev.db53a5e0-1e6a-4f06-a9f7-2c104c4612fb 0
...done.
可以看到队列HELLO中的数据被读走了,条数为0。
未完待续
http://www.01happy.com/ubuntu-rabbitmq-and-python-practice/