Python之RabbitMQ操作

时间:2022-08-27 11:37:44

RabbitMQ是一个消息代理,从“生产者”接收消息并传递消息至“消费者”,期间可根据规则路由、缓存、持久化消息。“生产者”也即message发送者以下简称P,相对应的“消费者”乃message接收者以下简称C,message通过queue由P到C,queue存在于RabbitMQ,可存储尽可能多的message,多个P可向同一queue发送message,多个C可从同一个queue接收message。

实现的协议:AMQP。
 
术语(Jargon)
 
P,Producing,制造和发送信息的一方。
Queue,消息队列。
C,Consuming,接收消息的一方。
 
RabbitMQ安装
 安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang
$ yum -y install erlang 安装RabbitMQ
$ yum -y install rabbitmq-server

安装rabbitmq API

 pip install pika
or
easy_install pika
or
源码 https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

 #!/usr/bin/env python
import pika # ######################### 生产者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
 #!/usr/bin/env python
import pika # ########################## 消费者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body) channel.basic_consume(callback,
queue='hello',
no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

1、acknowledgment 消息不丢失(订阅端消息不丢失)

no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错 def callback(ch,method,properties,body):
print("[x] Received %r" %body)#打印获得消息的内容
ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 channel.basic_consume(callback,queue='hai',no_ack=False)
#获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
# 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
# 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 print('[*]Waiting for messages to exit press CTRL+C')
channel.start_consuming()

消费者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai')#创建一个MQ队列,名称为 hai
channel.basic_publish(exchange='',routing_key='hai',body='hello world')
#向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
print("[x] Sent 'hello world' ")
connection.close()

生产者

2、durable   消息不丢失(服务端消息不丢失)

 # time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化 def callback(ch,method,properties,body):
print("[x] Received %r" %body)#打印获得消息的内容
ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息 channel.basic_consume(callback,queue='hai',no_ack=True)
#获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
# 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
# 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 print('[*]Waiting for messages to exit press CTRL+C')
channel.start_consuming()

消费者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
channel.basic_publish(exchange='',routing_key='hai',body='hello world',
properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
#向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
print("[x] Sent 'hello world' ")
connection.close()

生产者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照索引排列

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai,如果不创建,必须保证生产者已经创建,否则报错.durable=True是让消息持久化 def callback(ch,method,properties,body):
print("[x] Received %r" %body)#打印获得消息的内容
ch.basic_ack(delivery_tag=method.delivery_tag)#当no_ack=Fales的时候,使用该语句提醒服务端,消息执行完毕,服务端可以删除队列中的消息
channel.basic_qos(prefetch_count=1)#客户端按顺序去取,默认为奇偶数
channel.basic_consume(callback,queue='hai',no_ack=True)
#获取hai队列中的消息,获得消息后执行callback函数,no_ack参数默认值为False,表示客户端收到消息后是否等待确认消息,
# 如果为true,则表示获取数据后生产者将该消息删除,消费者如果丢失消息,无法找回,如果为false,则表示生产者等待消费者获取消息后等待生产者确认,
# 如果没有收到确认,则会重新将消息加入到队列中,消息不会丢失 print('[*]Waiting for messages to exit press CTRL+C')
channel.start_consuming()

消费者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接rabbitMQ服务器
channel=connection.channel()#创建频道,通过频道操作rabbitmq
channel.queue_declare(queue='hai',durable=True)#创建一个MQ队列,名称为 hai
channel.basic_publish(exchange='',routing_key='hai',body='hello world',
properties=pika.BasicProperties(delivery_mode=2,))#该语句作用为告诉rabbit服务器将消息持久化
#向hai队列中插入"hello world"消息。routing_key指定队列名称,body指定消息内容
print("[x] Sent 'hello world' ")
connection.close()

生产者

4、发布订阅

Python之RabbitMQ操作

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

exchange type = fanout

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys
connection=pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8'))#连接消息队列服务器
channel=connection.channel()#创建频道,通过频道对rabbitmq进行操作 channel.exchange_declare(exchange='logs',type='fanout')#创建exchange,名称为logs,若果该消息队列已经创建,可以省略
message=''.join(sys.argv[1:]) or "info: Hello Wrold" channel.basic_publish(exchange='logs',routing_key='',body=message)#将消息添加今年队列
print('[x] sent %r'%message)
connection.close()

发布者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.8')) #连接消息队列服务器
channel = connection.channel()#创建频道,通过频道对rabbitmq进行操作 channel.exchange_declare(exchange='logs',#创建exchange,名称为logs
type='fanout')#type='fanout'作用为凡是和exchange相关联的队列,在用户给exchange发消息时,所有关联队列都会受到消息 result=channel.queue_declare(exclusive=True)#不指定队列名,有系统随机创建
queue_name=result.method.queue channel.queue_bind(exchange='logs',queue=queue_name)#将exchange和当前的消息队列做一个绑定
print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch,method,properties,body):
print('[x] %r' %body) channel.basic_consume(callback,queue=queue_name,no_ack=True)#在队列中获取消息 channel.start_consuming()

订阅者

5、关键字发送

exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.138'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') #severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
#message = ' '.join(sys.argv[2:]) or 'Hello World!'
severity='info'
message='test'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

发布者

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.8'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct')#设置exchange类型为direct result = channel.queue_declare(exclusive=True) #创建随机队列
queue_name = result.method.queue # severities = sys.argv[1:]
# if not severities:
# sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
# sys.exit(1)
severities=['error']
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)#绑定关键字 print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅在1

 #!/usr/bin/env python
# time:
# Auto:PANpan
# func:
#!/usr/bin/env python
# time:
# Auto:PANpan
# func:
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.0.0.8'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') result = channel.queue_declare( )
#声明queue,确认要从中接收message的queue
#queue_declare函数是幂等的,可运行多次,但只会创建一次
#若可以确信queue是已存在的,则此处可省略该声明,如producer已经生成了该queue
#但在producer和consumer中重复声明queue是一个好的习惯
#例如: channel.queue_declare(queue='hello')
queue_name = result.method.queue # severities = sys.argv[1:]
# if not severities:
# sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
# sys.exit(1)
severities=['error','info']
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅在2

6、模糊匹配

Python之RabbitMQ操作

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
 #!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅者

 #!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

发布者

注:

订阅/发布Demo
 
发送消息给多个订阅者
核心思想:消息发送给exchange,每个接收方创建匿名Queue绑定到exchange,exchange发送消息给每个接收方。
 
Exchanges
 
在RabbitMQ完整的模型中,消息只能发送给一个exchange。
exchange一方面接收消息,另一方面push给queues。
 
Python之RabbitMQ操作
exchange类型
> rabbitmqctl list_exchanges
direct
topic
headers
fanout 广播消息给已知队列
 

Python之RabbitMQ操作的更多相关文章

  1. Python之RabbitMQ的使用

    今天总结一下Python关于Rabbitmq的使用 RabbitMQ官网说明,其实也是一种队列,那和前面说的线程queue和进程queue有什么区别呢? 线程queue只能在同一个进程下进行数据交互 ...

  2. openresty 学习笔记番外篇:python访问RabbitMQ消息队列

    openresty 学习笔记番外篇:python访问RabbitMQ消息队列 python使用pika扩展库操作RabbitMQ的流程梳理. 客户端连接到消息队列服务器,打开一个channel. 客户 ...

  3. python高级之操作数据库

    python高级之操作数据库 本节内容 pymysql介绍及安装 使用pymysql执行sql 获取新建数据自增ID fetch数据类型设置 1.pymysql介绍及安装 在python2中连接数据库 ...

  4. 用 Python、 RabbitMQ 和 Nameko 实现微服务

    用 Python. RabbitMQ 和 Nameko 实现微服务 原创 07-17 17:57 首页 Linux中国 "微服务是一股新浪潮" - 现如今,将项目拆分成多个独立的. ...

  5. Python 文件常见操作

    # -*-coding:utf8 -*- ''''' Python常见文件操作示例 os.path 模块中的路径名访问函数 分隔 basename() 去掉目录路径, 返回文件名 dirname()  ...

  6. Python :open文件操作,配合read()使用!

    python:open/文件操作 open/文件操作f=open('/tmp/hello','w') #open(路径+文件名,读写模式) 如何打开文件 handle=open(file_name,a ...

  7. 第九篇:python高级之操作数据库

    python高级之操作数据库   python高级之操作数据库 本节内容 pymysql介绍及安装 使用pymysql执行sql 获取新建数据自增ID fetch数据类型设置 1.pymysql介绍及 ...

  8. (转载)Python 列表(list)操作

    (转载)http://blog.csdn.net/facevoid/article/details/5338048 创建列表sample_list = ['a',1,('a','b')] Python ...

  9. python之数据库操作(sqlite)

    python之数据库操作(sqlite) 不像常见的客户端/服务器结构范例,SQLite引擎不是个程序与之通信的独立进程,而是连接到程序中成为它的一个主要部分.所以主要的通信协议是在编程语言内的直接A ...

随机推荐

  1. Geolocation API JavaScript访问用户的当前位置信息

    Geolocation API在浏览器中的实现是navigator.geolocation对象,常用的有以下方法. 1.第一个方法是getCurrentPosition() 调用这个方法就会触发请求用 ...

  2. ABP dynamic API

    打开ABP的事例项目SimpleTaskSystem.WebSpaAngular 中LayoutView <!-- Dynamic scripts of ABP system (They are ...

  3. 基于Python的函数回归算法验证

    看机器学习看到了回归函数,看了一半看不下去了,看到能用方差进行函数回归,又手痒痒了,自己推公式写代码验证: 常见的最小二乘法是一阶函数回归回归方法就是寻找方差的最小值y = kx + bxi, yiy ...

  4. nginx 支持pathinfo的配置文件

    location ~ \.php { fastcgi_pass 127.0.0.1:9000; fastcgi_index index.php; include fastcgi_params; set ...

  5. js跟随鼠标移动的写法

    <script> window.onload=function(){ document.onmousemove=function (ev) { var oEvent=ev||event; ...

  6. 【生活】已经从官网购买iPad,单独购买AppleCare&plus;服务

    1 什么是AppleCare+服务 从苹果官网购买的硬件产品如ipad.iphone和MacBook等,官网承诺的保修期限是一年.AppleCare+是水果公司推出的一种保修服务,最大的特点就是将保修 ...

  7. java调用C&plus;&plus; DLL库方法

    最近一个项目要开发网页端人脸识别项目,人脸识别的算法已经写好,是C++版,但是网页端要求使用Java后台,这就涉及到Java调用DLL的问题.经过查找,实现了一个简单的例子. 1.第一步,先在Java ...

  8. S2&lowbar;SQL&lowbar;第三章

    3.1:修改表 3.1.1:修改表 语法: Alter table <旧表名> rename [ TO] <新表名>; 例子:Alter table `demo01` rena ...

  9. Django 系列博客(四)

    Django 系列博客(四) 前言 本篇博客介绍 django 如何和数据库进行交互并且通过 model 进行数据的增删查改 ORM简介 ORM全称是:Object Relational Mappin ...

  10. JavaScript之柯里化

    //未柯里化 function add(a,b){ return a + b; } //柯里化 function add(y){ return function(x){ console.log(y + ...