Python之异步IO&RabbitMQ&Redis

时间:2023-03-09 15:14:22
Python之异步IO&RabbitMQ&Redis

协程:

1、单线程运行,无法实现多线程。

2、修改数据时不需要加锁(单线程运行),子程序切换是线程内部的切换,耗时少。

3、一个cpu可支持上万协程,适合高并发处理。

4、无法利用多核资源,因为协程只有一个线程。

使用yield实现协程:

import time
import Queue
def consumer(name):
print("--->starting eating baozi...")
while True:
new_baozi = yield
print("[%s] is eating baozi %s" % (name,new_baozi))
#time.sleep(1) def producer():
r = con.next()#拥有yield的函数是迭代起,使用next()方法取值。
r = con2.next()
n = 0
while n < 5:
n +=1
con.send(n)
con2.send(n)
print("\033[32;1m[producer]\033[0m is making baozi %s" %n ) if __name__ == '__main__':
con = consumer("c1")
con2 = consumer("c2")
p = producer()

gevent:

gevent是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

安装第三方库:

sudo apt-get install libevent-dev
sudo apt-get install python-dev
sudo easy-install gevent

eg:

import gevent

def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again') def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar') gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
]) #执行结果:
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar

异步IO:

程序在遇到IO操作时,会切换执行其他任务。

from gevent import monkey; monkey.patch_all()
import gevent
from urllib2 import urlopen def f(url):
print('GET: %s' % url)
resp = urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url)) gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])

事件驱动与异步IO:

单线程、多线程与事件驱动编程模型的比较如下图:

这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

Python之异步IO&RabbitMQ&Redis

在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。

在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。

在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中(程序遇到IO操作时,会将该操作任务放到操作系统提供的高速队列中,切换去执行其他程序的非IO操作。待该IO操作结束后,需回调IO结果,这个过程中该任务需要不断的循环去询问队列中IO操作是否完成),然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。

当我们面对如下的环境时,事件驱动模型通常是一个好的选择:

  1. 程序中有许多任务,而且…
  2. 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
  3. 在等待事件到来时,某些任务会阻塞。

当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。

网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。

(参考连接:http://www.cnblogs.com/Anker/p/3254269.html)

常用异步IO模型:Select、Poll和Epoll

select:

程序在向操作系统的队列中询问IO操作是否完成时,队列会返回给程序整个队列里所有注册的IO任务列表(程序将队列中所有IO列表copy一份)。此时程序会循环所有IO任务列表来寻找属于自己注册的那个,这样效率很低,消耗资源。系统默认打开文件个数为1024,若队列中IO个数超过此数量,还需要修改系统设置。

poll:

poll和select没有本质区别,只是将最大文件量的限制取消了。

epoll:

程序去队列中查找IO任务时,队列回返回程序一个队列的描述符(并不是返回整个队列IO列表)。epoll还采用了内存映射(mmap)技术,将内核内存(内核态)映射为一个文件,使得程序(用户态)可以直接访问,不需要再复制队列列表。

关于这三者的比较,可以参考这篇文章:

http://www.cnblogs.com/Anker/p/3265058.html

Python MySQL API

参考地址:http://www.cnblogs.com/wupeiqi/articles/5095821.html

Redis

下载安装:http://redis.io/

tar zxvf redis-3.0.7.tar.gz
cd redis-3.0.7
make

启动服务端:(默认端口号为6379)

cd src/
./redis-server

启动客户端:

./redis-cli

常用操作:

127.0.0.1:6379> keys *#查看所有键值
(empty list or set)
127.0.0.1:6379> set name ahaii#添加值
OK
127.0.0.1:6379> get name#根据键查看值
"ahaii"
127.0.0.1:6379> set name ahaii ex 5#设置值的有效时间
OK
127.0.0.1:6379> get name
"ahaii"
127.0.0.1:6379> get name
(nil)

Python操作Redis:

安装redis模块:

pip install redis

基本操作:

#!/usr/bin/python

import redis

r = redis.Redis(host='localhost',port=6379)
r.set('name','gom')
print r.get('name')

连接池:

可以设置一个连接池,使多个redis实例共享一个连接池,避免每次建立、释放连接的开销。

#!/usr/bin/python

import redis

r = redis.Redis(host='localhost',port=6379)
r.set('name','gom')
print r.get('name')

常用方法:

set:

set(name, value, ex=None, px=None, nx=False, xx=False)

redis中设置值时,默认若不存在则创建该值,若存在则修改该值。

  ex:过期时间(秒)

  px:过期时间(毫秒)

  nx:如果设置为True,则只有name不存在时,当前set操作才执行

  xx:如果设置为True,则只有name存在时,岗前set操作才执行

setnx(name,value):只有name不存在时,执行添加操作

setex(name,value,time):设置值和过期时间(秒)

psetex(name,value,time):设置值和过期时间(毫秒)

mset(*args, **kwargs):批量设置,如

mset(k1='v1', k2='v2')

mget({'k1': 'v1', 'k2': 'v2'})

get:取值

getrange:截取一段,类似列表切片

import redis

r = redis.Redis(host='localhost',port=6379)
r.set('id','qwerty')
print r.getrange('id',2,4)#截取从第2到4个字符(从0开始) #执行结果:ert

其他操作参考以下博客:

http://www.cnblogs.com/wupeiqi/articles/5132791.html

http://www.jb51.net/article/56448.htm

Redis订阅与发布

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

Python之异步IO&RabbitMQ&Redis

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

Python之异步IO&RabbitMQ&Redis

实例:

创建名为redisChat的订阅频道:

127.0.0.1:6379> SUBSCRIBE redisChar
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChar"
3) (integer) 1

在另外一个redis客户端中发布消息:

127.0.0.1:6379> PUBLISH redisChar 'hello ahaii'
(integer) 1
127.0.0.1:6379> PUBLISH redisChar 'learn redis'
(integer) 1

此时,第一个客户端中就会收到这两条消息:

1) "message"
2) "redisChar"
3) "hello ahaii"
1) "message"
2) "redisChar"
3) "learn redis"

RabbitMQ:

RabbitMQ 是信息传输的中间者。本质上,他从生产者(producers)接收消息,转发这些消息给消费者(consumers)。换句话说,他能够按根据你指定的规则进行消息转发、缓冲、和持久化。

ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)可以从Queue中获取消息并消费。

Python之异步IO&RabbitMQ&Redis

多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

Python之异步IO&RabbitMQ&Redis

RabbitMQ安装:

sudo apt-get install rabbitmq-server

启动服务:

sudo /etc/init.d/rabbitmq-server start

安装Python的RabbitMQ API:

sudo easy_install pika

生产者与消费者实例:

生产者:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))#指定主机

channel = connection.channel()#生成一个管道

channel.queue_declare(queue='hello')#在管道中创建一个队列

channel.basic_publish(exchange='',routing_key='hello',body='hello ahaii')
#exchange:交换器,消息首先到达exchange,exchange会根据队列的名字,将消息转发到指定的队列。
#routing_key:指定队列名字
#body:消息体 print 'send hello ahaii'
connection.close()

消费者:

import  pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()#创建管道 channel.queue_declare(queue='hello')#创建队列
#消费者创建队列的目的是,当生产者(程序)晚于消费者启动时,该队列依然存在(由消费者创建),这样避免因队列不存在而报错 def callback(ch,method,properties,body):#回调函数,参数固定
print 'recived %s' %body channel.basic_consume(callback,queue='hello',no_ack=True)#当从hello收到消息后,调用回调函数(callback),no_ack表示是否需要确认该任务正常执行完毕
print 'waiting for messaage:'
channel.start_consuming()#开始阻塞,接收任务

消息持久化:

将消息持久化后,服务重启后,消息依然存在。

持久化参数:

channel.queue_declare(queue='hello', durable=True)

参考:http://blog.****.net/column/details/rabbitmq.html