进程QUEUE,可以是父进程与子进程间进行交互,也可以是同属于一个父进程的子进程间的交互;如果要实现进程A与进程B之间的通信,就需要借助一个中间进程了,我们习惯称为消息队列。
QQ无法直接与WORD通信,但是QQ可以把消息发给RabbitMQ,然后WORD从RabbitMQ里接收对应的消息,反之亦然,就实现了QQ和WORD的通信。
RabbitMQ可以维护多个消息队列。
1.RabbitMQ基本示例
provider
#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel() #声明queue,即声明一个消息队列
channel.queue_declare(queue='hello') #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
consumer
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel() #声明一个叫“hello”的消息队列,一定要跟provider端对应上;你可能会有疑问,在provider不是声明了嘛,consumer为什么还要声明,这是因为:如果先执行的provider程序,那这时候已经有“hello”这个消息队列了,consumer再执行的时候就可以从“hello”里取消息;但如果先执行的consumer,因为consumer没有声明“hello”这个消息队列,而provider也还没执行呢,所以consumer要从“hello”里取的时候就会报错。所以provider和consumer里都声明了这个消息队列,先执行谁都行,后执行的那个一看有了这个消息队列名,就不声明了。
channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body) channel.basic_consume(callback,
queue='hello',
no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')
#这个channel.start会一直接收消息,没有消息就在这卡住,不是仅接收一条。
channel.start_consuming()
2.RabbitMQ消息分发轮询
2.1 消息轮询
可以让生产者发送任务,消费者执行任务,消费者执行完任务后再将结果反馈给生产者,如此,就类似于C/S架构了。
RabbitMQ的消息队列是一个轮询机制来消费消息,比如先启动一个consumer1,再启动一个consumer2,再启动一个comsumer3,然后provider生产了一条“hello”,首先是comsumer1收到这个消息,然后provider又生产了一条“hello”,这次consumer2会收到,provider再生产了一条“hello”,这次consumer3会收到,依次按consumer注册顺序接收消息(consumer和provider必须是同一个queue);这个有点类似于负载均衡,各consumer权重相同。
2.2 no_ack
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body) channel.basic_consume(callback,
queue='hello',
no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')
#这个channel.start会一直接收消息,没有消息就在这卡住,不是仅接收一条。
channel.start_consuming() #no_ack=True,no_ack的全称是no_ackownledgement,也就是无需确认,这样无论consumer是否消费了这条消息(哪怕consumer端宕机了),rabbitmq都认为它已经消费完成了,不会把该消息给别的consumer,这样显然是不好的,所以我们一般不加“no_ack=true”,这样provider生产了一条消息,consumer1收到后中途宕机了,consumer2会继续收到这条消息,如果consumer2处理过程中也宕机了,那consumer3就会收到这条消息,这样可以保证这条消息被完整消费掉。
3. 消息持久化
3.1 队列持久化
provider和consumer声明消息队列时,添加一个durable参数,channel.queue_declare(queue
=
'hello'
, durable
=
True
),这样就实现了队列的持久化,比如队列里有“hello1”和“hello2”两个队列,此时rabbitmq服务宕了,再重新启动rabbitmq后,被持久化的队列依然存在,未被持久化的队列就不存在了。但是这仅仅是持久化了队列,队列里的消息没有被持久化。
3.2 消息持久化
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
4.RabbitMQ fanout广播模式
4.1 配置使客户端处理完一个消息才接受新的消息
如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。
这个只需要在客户端,即consumer上配置即可,代码:
#!/usr/bin/env python
import pika
import time connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) #在consumer端添加上此行
channel.basic_consume(callback,
queue='task_queue') channel.start_consuming()
4.2 广播模式
provider发一条消息,所有consumer都能收到这条消息。
之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了。
exchange可以理解为一个管道,从provider收取消息,然后把消息推送到消息队列里,exchange必须精确的知道在收到消息后做什么,是将消息挂起在某个特定队列,还是挂起到多个队列,还是被丢弃。这些规则根据exchange的类型来定。
Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息。
fanout: 所有bind到此exchange的queue都可以接收消息,纯广播;
direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息;
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息。
publisher
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()
subscriber
#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout') result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue channel.queue_bind(exchange='logs',
queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()
fanout模式下,provider就像收音机电台,各个consumer就像听众,只要听众开着自己的收音机(即consumer服务正常),就能收到provider的消息,一旦听众关了收音机(consumer崩了),错过的内容就无法重新接收了。
5.RabbitMQ direct广播模式
RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据关键字判定应该将数据发送至指定队列。
publisher
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
subscriber
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1) for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()
服务端:python server.py warning ;客户端:python client.py warning info 。客户端可以接收多个参数。
6.RabbitMQ topic细致的消息过滤广播模式
publisher
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
subscriber
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()
7.redis
http://www.cnblogs.com/wupeiqi/articles/5132791.html
redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
一、Redis安装和基本使用
1
2
3
4
|
wget http: / / download.redis.io / releases / redis - 3.0 . 6.tar .gz
tar xzf redis - 3.0 . 6.tar .gz
cd redis - 3.0 . 6
make |
启动服务端
1
|
src / redis - server
|
启动客户端
1
2
3
4
5
|
src / redis - cli
redis> set foo bar
OK redis> get foo "bar" |
二、Python操作Redis
1
2
3
4
5
6
7
|
sudo pip install redis or sudo easy_install redis or 源码安装 详见:https: / / github.com / WoLpH / redis - py
|
API使用
redis-py 的API的使用可以分类为:
- 连接方式
- 连接池
- 操作
- String 操作
- Hash 操作
- List 操作
- Set 操作
- Sort Set 操作
- 管道
- 发布订阅
1.操作模式
redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。
import redis redis_go = redis.Redis(host='192.168.0.30',port=6379)
redis_go.set('foo','')
print(redis_go.get('foo'))
2.连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。
import redis pool = redis.ConnectionPool(host='192.168.0.30',port=6379)
r = redis.Redis(connection_pool=pool)
r.set('foo','')
print(r.get('foo'))
3.操作
String操作,redis中的String在在内存中按照一个name对应一个value来存储。如图:
set(name, value, ex=None, px=None, nx=False, xx=False)
1
2
3
4
5
6
|
在Redis中设置值,默认,不存在则创建,存在则修改 参数: ex,过期时间(秒)
px,过期时间(毫秒)
nx,如果设置为True,则只有name不存在时,当前set操作才执行
xx,如果设置为True,则只有name存在时,岗前set操作才执行
|
setnx(name, value)
1
|
设置值,只有name不存在时,执行设置操作(添加) |
setex(name, value, time)
1
2
3
|
# 设置值 # 参数: # time,过期时间(数字秒 或 timedelta对象)
|
psetex(name, time_ms, value)
1
2
3
|
# 设置值 # 参数: # time_ms,过期时间(数字毫秒 或 timedelta对象)
|
mset(*args, **kwargs)
1
2
3
4
5
|
批量设置值 如: mset(k1= 'v1' , k2= 'v2' )
或
mget({ 'k1' : 'v1' , 'k2' : 'v2' })
|
get(name)
1
|
获取值 |
mget(keys, *args)
1
2
3
4
5
|
批量获取 如: mget( 'ylr' , 'wupeiqi' )
或
r.mget([ 'ylr' , 'wupeiqi' ])
|
getset(name, value)
1
|
设置新值并获取原来的值 |
getrange(key, start, end)
1
2
3
4
5
6
|
# 获取子序列(根据字节获取,非字符) # 参数: # name,Redis 的 name
# start,起始位置(字节)
# end,结束位置(字节)
# 如: "武沛齐" ,0-3表示 "武" |
setrange(name, offset, value)
1
2
3
4
|
# 修改字符串内容,从指定字符串索引开始向后替换(新值太长时,则向后添加) # 参数: # offset,字符串的索引,字节(一个汉字三个字节)
# value,要设置的值
|
setbit(name, offset, value)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
# 对name对应值的二进制表示的位进行操作 # 参数: # name,redis的name
# offset,位的索引(将值变换成二进制后再进行索引)
# value,值只能是 1 或 0
# 注:如果在Redis中有一个对应: n1 = "foo", 那么字符串foo的二进制表示为: 01100110 01101111 01101111
所以,如果执行 setbit( 'n1' , 7 , 1 ),则就会将第 7 位设置为 1 ,
那么最终二进制则变成 01100111 01101111 01101111 ,即: "goo"
# 扩展,转换二进制表示: # source = "武沛齐"
source = "foo"
for i in source:
num = ord (i)
print bin (num).replace( 'b' ,'')
特别的,如果source是汉字 "武沛齐" 怎么办?
答:对于utf - 8 ,每一个汉字占 3 个字节,那么 "武沛齐" 则有 9 个字节
对于汉字, for 循环时候会按照 字节 迭代,那么在迭代时,将每一个字节转换 十进制数,然后再将十进制数转换成二进制
11100110 10101101 10100110 11100110 10110010 10011011 11101001 10111101 10010000
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
武 沛 齐
|
getbit(name, offset)
1
|
# 获取name对应的值的二进制表示中的某位的值 (0或1) |
bitcount(key, start=None, end=None)
1
2
3
4
5
|
# 获取name对应的值的二进制表示中 1 的个数 # 参数: # key,Redis的name
# start,位起始位置
# end,位结束位置
|
1
|
# 返回name对应值的字节长度(一个汉字3个字节) |
incr(self, name, amount=1)
1
2
3
4
5
6
7
|
# 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 # 参数: # name,Redis的name
# amount,自增数(必须是整数)
# 注:同incrby |
incrbyfloat(self, name, amount=1.0)
1
2
3
4
5
|
# 自增 name对应的值,当name不存在时,则创建name=amount,否则,则自增。 # 参数: # name,Redis的name
# amount,自增数(浮点型)
|
decr(self, name, amount=1)
1
2
3
4
5
|
# 自减 name对应的值,当name不存在时,则创建name=amount,否则,则自减。 # 参数: # name,Redis的name
# amount,自减数(整数)
|
append(key, value)
1
2
3
4
5
|
# 在redis name对应的值后面追加内容 # 参数: key, redis的name
value, 要追加的字符串
|
7.4.Hash操作,redis中Hash在内存中的存储格式如下图:
hset(name, key, value)
1
2
3
4
5
6
7
8
9
|
# name对应的hash中设置一个键值对(不存在,则创建;否则,修改) # 参数: # name,redis的name
# key,name对应的hash中的key
# value,name对应的hash中的value
# 注: # hsetnx(name, key, value),当name对应的hash中不存在当前key时则创建(相当于添加)
|
hmset(name, mapping)
1
2
3
4
5
6
7
8
|
# 在name对应的hash中批量设置键值对 # 参数: # name,redis的name
# mapping,字典,如:{'k1':'v1', 'k2': 'v2'}
# 如: # r.hmset('xx', {'k1':'v1', 'k2': 'v2'})
|
hget(name,key)
1
|
# 在name对应的hash中获取根据key获取value |
hmget(name, keys, *args)
1
2
3
4
5
6
7
8
9
10
11
|
# 在name对应的hash中获取多个key的值 # 参数: # name,reids对应的name
# keys,要获取key集合,如:['k1', 'k2', 'k3']
# *args,要获取的key,如:k1,k2,k3
# 如: # r.mget('xx', ['k1', 'k2'])
# 或
# print r.hmget('xx', 'k1', 'k2')
|
hgetall(name)
1
|
获取name对应 hash 的所有键值
|
hlen(name)
1
|
# 获取name对应的hash中键值对的个数 |
hkeys(name)
1
|
# 获取name对应的hash中所有的key的值 |
hvals(name)
1
|
# 获取name对应的hash中所有的value的值 |
hexists(name, key)
1
|
# 检查name对应的hash是否存在当前传入的key |
hdel(name,*keys)
1
|
# 将name对应的hash中指定key的键值对删除 |
hincrby(name, key, amount=1)
1
2
3
4
5
|
# 自增name对应的hash中的指定key的值,不存在则创建key=amount # 参数: # name,redis中的name
# key, hash对应的key
# amount,自增数(整数)
|
hincrbyfloat(name, key, amount=1.0)
1
2
3
4
5
6
7
8
|
# 自增name对应的hash中的指定key的值,不存在则创建key=amount # 参数: # name,redis中的name
# key, hash对应的key
# amount,自增数(浮点数)
# 自增name对应的hash中的指定key的值,不存在则创建key=amount |
hscan(name, cursor=0, match=None, count=None)
1
2
3
4
5
6
7
8
9
10
11
12
13
|
# 增量式迭代获取,对于数据大的数据非常有用,hscan可以实现分片的获取数据,并非一次性将数据全部获取完,从而放置内存被撑爆 # 参数: # name,redis的name
# cursor,游标(基于游标分批取获取数据)
# match,匹配指定key,默认None 表示所有的key
# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
# 如: # 第一次:cursor1, data1 = r.hscan('xx', cursor=0, match=None, count=None)
# 第二次:cursor2, data1 = r.hscan('xx', cursor=cursor1, match=None, count=None)
# ...
# 直到返回值cursor的值为0时,表示数据已经通过分片获取完毕
|
hscan_iter(name, match=None, count=None)
1
2
3
4
5
6
7
8
9
|
# 利用yield封装hscan创建生成器,实现分批去redis中获取数据 # 参数: # match,匹配指定key,默认None 表示所有的key
# count,每次分片最少获取个数,默认None表示采用Redis的默认分片个数
# 如: # for item in r.hscan_iter('xx'):
# print item
|
7.5 Redis集合set和有序集合操作
7.5.1 列表
,lpush是从队列左边插入数据,然后从左端开始抽取数据,所以用lrange查看names列表时,最先进去的最后才出来了,先进后出;range是从队列右边插入数据,然后也是从左端开始抽取数据,所以就是先进先出。
7.5.2 集合
因为是集合,所以插入重复的数据,只会插入一次;而且集合是无序的,所以不能切片取数据。
更多内容参见上面的wupeiqi网址
7.5.3 有序集合
插入的时候定了序号,然后zrange按序号大小排列。
7.5.4 管道
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。
#!/usr/bin/env python
# -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host='10.211.55.4', port=6379) r = redis.Redis(connection_pool=pool) # pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True) pipe.set('name', 'alex') #不会真的执行,只是把命令放到了管道
pipe.set('role', 'sb') #不会真的执行,只是把命令放到了管道 pipe.execute() #执行管道里的所有命令