Python 之路:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

时间:2022-05-20 11:03:11
一、Memcached

Memcached是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负债。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值对的hashmap。其守护进程(daemon)是用C写的,但是客户端可以用户任何语言来编写,并通过memcached协议与守护进程通信。

Memcached安装和基本使用

Memcached安装:

root@wulaoer:~# rpm -qa|grep libevent #检查服务器是否安装libevent
root@wulaoer:~# whet https://github.com/downloads/libevent/libevent/libevent-2.0.21-stable.tar.gz #下载libevent
root@wulaoer:~# tar -zxvf libevent-2.0.21-stable.tar.gz #解压
root@wulaoer:~# ./configure --prefix=/usr/local/libevent (指定安装到/usr/local/libevent目录下) #安装,指定路径
root@wulaoer:~# make #编译
root@wulaoer:~# make install
root@wulaoer:~# wget http://memcached.org/files/memcached-1.4.25.tar.gz #下载memcached
root@wulaoer:~# tar -zxvf memcached-1.x.x.tar.gz
root@wulaoer:~# cd memcached-1.x.x
root@wulaoer:~# ./configure && make && make test && sudo make install
若安装过程中提示找不到libevent路径时,使用--with-libevent=libevent安装的目录
./configure --prefix=/usr/local/memcached-1.4.17 --with-libevent=/usr/local/libevent/ PS:依赖libevent
yum install libevent-devel
apt-get install libevent-dev

启动Memcached

root@wulaoer:~# memcached -d -m 10    -u root -l 192.168.17.128 -p 12000 -c 256 -P /tmp/memcached.pid
参数说明:
-d 是启动一个守护进程
-m 是分配给Memcache使用的内存数量,单位是MB
-u 是运行Memcache的用户
-l 是监听的服务器IP地址
-p 是设置Memcache监听的端口,最好是1024以上的端口
-c 选项是最大运行的并发连接数,默认是1024,按照你服务器的负载量来设定
-P 是设置保存Memcache的pid文件

Memcached命令

存储命令: set/add/replace/append/prepend/cas
获取命令: get/gets
其他命令: delete/stats..

Python操作Memcached

安装API

python操作Memcached使用Python-memcached模块
下载安装:https://pypi.python.org/pypi/python-memcached

1、第一次操作

import memcache

mc = memcache.Client(['192.168.17.128:12000'], debug=True)
mc.set("foo", "bar")
ret = mc.get('foo')
print ret

Ps:debug = True 表示运行出现错误时,显示错误信息,上线后移除该参数。  

2、天生支持集群

python-memcached模块原生支持集群操作,其原理是在内存维护一个主机列表,且集群中主机的权重值和主机在列表中重复出现的次数成正比。

     主机    权重
1.1.1.1 1
1.1.1.2 2
1.1.1.3 1 那么在内存中主机列表为:
host_list = ["1.1.1.1", "1.1.1.2", "1.1.1.2", "1.1.1.3", ]

如果用户根据要求要在内存中创建一个键值对(如:k1 = “v1”),那么要执行一下步骤:

  • 根据算法将k1转换成一个数字
  • 将数字和主机表长度求余数,得到一个值N( 0 <= N < 列表长度 ) 
  • 在主机列表中根据 第2步得到的值为索引获取主机,例如:host_list[N]
  • 链接将第3步中获取的主机,将 k1 = “v1”放置在该服务器的内存中

代码如下:

mc = memcache.Client([('1.1.1.1:12000', 1), ('1.1.1.2:12000', 2), ('1.1.1.3:12000', 1)], debug=True)#后面的1、2、1就是权重

mc.set('k1', 'v1')

3、add

添加一条键值对,如果已经存在的key,重复执行add操作异常

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
mc = memcache.Client(['192.168.17.128:12000'], debug=True)
mc.add('k1', 'v1') #添加一个键值对
# mc.add('k1', 'v2') # 报错,对已经存在的key重复添加,失败!!!

Ps:在添加的键值对中,key不得重复,运行一次添加一次,重复运行会出现(emCached: while expecting 'STORED', got unexpected response 'NOT_STORED')添加失败。

4、replace

replace修改某个key的值,如果key不存在,则异常

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache mc = memcache.Client(['192.168.17.128:12000'], debug=True)
# 如果memcache中存在kkkk,则替换成功,否则一场
mc.replace('kkkk','999')#修改kkk的键值对值

Ps:锁修改的键值对的key必须存在,如果不存在不能直接创建,就会报错(MemCached: while expecting 'STORED', got unexpected response 'NOT_STORED')。

5、set和set_multi

  • set     设置一个键值对,如果key不存在,则创建,如果key存在,则修改  
  • set_multi 设置多个键值对,如果key不存在,则创建,如果key存在,则修改
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache mc = memcache.Client(['192.168.17.128:12000'], debug=True) mc.set('key0', 'wupeiqi')#创建一个键值对或修改key0的键值对 #mc.set_multi({'key1': 'val1', 'key2': 'val2'})#创建或修改多个键值对

set_multi实例

#!/usr/bin/env  python
# --*--coding:utf- --*--
import memcache
mc = memcache.Client(['192.168.17.129:12000'], debug=True)
mc.set_multi({'k6': 'v4', 'k7': 'v5'})
ret = mc.get('k6')
set = mc.get('k7')
print ret,set

Ps: add和set的区别,add是对于存在的报错,不存在的创建,set是对于存在的修改,不存在的创建。

6、delete和delete_multi

  • delete     在Memcached中删除指定的一个键值对
  • delete_multi 在Memcached中删除指定的多个键值对
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache mc = memcache.Client(['192.168.17.128:12000'], debug=True) mc.delete('key0') #删除key0的键值对
#mc.delete_multi(['key1', 'key2']) #删除key1、key2的键值对

Ps:delete删除之后key的值为None

7、get和get_multi

  • get     获取一个键值对
  • get_multi 获取多一个键值对
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache mc = memcache.Client(['192.168.17.128:12000'], debug=True) val = mc.get('key0') #获取key0的键值对值
#item_dict = mc.get_multi(["key1", "key2", "key3"])#获取key1、key2、key3的键值对的值

8、append和perpend

  • append   修改指定key的值,在该值后面追加内容
  • perpend  修改指定key的值,在该值前面插入内容
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache mc = memcache.Client(['192.168.17.128:12000'], debug=True)
# k1 = "v1" mc.append('k1', 'after')#在k1后追加after
# k1 = "v1after" mc.prepend('k1', 'before')#在k1前追加before
# k1 = "beforev1after"

9、decr和incr

  • incr  自增,将Memcached中的某一个值增加N( N默认为1 )
  • decr  自减,将Memcached中的某一个值减少N( N默认为1 )
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache mc = memcache.Client(['10.211.55.4:12000'], debug=True)
mc.set('k1', '777')#修改键值对k1的值为777 mc.incr('k1')#将键值对k1的值增加1,如果k1后面没有跟值,
# k1 = 778 mc.incr('k1', 10)#将键值对k1的值增加10,k1后面的值是多少就增加多少。
# k1 = 788 mc.decr('k1')#将键值对k1的值自减1,
# k1 = 787 mc.decr('k1', 10)#将键值对k1的值减10,k1后面的值是多少就减少多少。
# k1 = 777

10、gets和cas

如商城商品剩余个数,假设改值保存在memcache中,product_count = 900
A用户刷新页面从memcache中读取到product_count = 900
B用户刷新页面从memcache中读取到product_count = 900

如果A、B用户均购买商品

A用户修改商品剩余个数 product_count=899
B用户修改商品剩余个数 product_count=899

如此一来缓存内的数据便不在正确,两个用户购买商品后,商品剩余还是 899
如果使用python的set和get来操作以上过程,那么程序就会如上述所示情况!

如果想要避免此情况的发生,只要使用 gets 和 cas 即可,如:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import memcache
mc = memcache.Client(['192.168.17.128:12000'], debug=True, cache_cas=True) v = mc.gets('product_count')
# ...
# 如果有人在gets之后和cas之前修改了product_count,那么,下面的设置将会执行失败,剖出异常,从而避免非正常数据的产生
mc.cas('product_count', "899")

Ps:本质上每次执行gets时,会从memcache中获取一个自增的数字,通过cas去修改gets的值时,会携带之前获取的自增值和memcache中的自增值进行比较,如果相等,则可以提交,如果不想等,那表示在gets和cas执行之间,又有其他人执行了gets(获取了缓冲的指定值), 如此一来有可能出现非正常数据,则不允许修改。

Memcached 真的过时了吗?

二、Redis

redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。

Redis安装和基本使用

wget http://download.redis.io/releases/redis-3.0.6.tar.gz
tar xzf redis-3.0.6.tar.gz
cd redis-3.0.6
make test

启动服务端

是在redis目录下启用服务
src/redis-server
#如果启动服务的时候,默认使用的是localhost的IP的话,可以在redis.conf里注释bind 本机IP。在重启以下src/redis-server redis.conf

启动客户端

src/redis-cli
redis> set foo bar #创建一个键值对
OK
redis> get foo #获取foo的键值对
"bar"

Python操作Redis

安装API

sudo pip install redis
or
sudo easy_install redis
or
源码安装 详见:https://github.com/WoLpH/redis-py

常用操作

1、操作模式

redis-py提供两个类Redis和StrictRedis用于实现Redis的命令,StrictRedis用于实现大部分官方的命令,并使用官方的语法和命令,Redis是StrictRedis的子类,用于向后兼容旧版本的redis-py。

#!/usr/bin/env python
# -*- coding:utf-8 -*- import redis r = redis.Redis(host='192.168.17.128', port=6379) #连接redis服务器,端口是6379
r.set('foo', 'Bar')#创建一个键值对
print r.get('foo')#打印键值对foo的值

2、连接池

redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。

#!/usr/bin/env python
# -*- coding:utf-8 -*- import redis pool = redis.ConnectionPool(host='192.168.17.128', port=6379)#建立连接,用pool来管理避免断开
r = redis.Redis(connection_pool=pool)#redis共享连接池
r.set('foo', 'Bar')#建键值对
print r.get('foo')#打印foo的键值对的值

ConnectionPool类

class ConnectionPool(object):
...........
def __init__(self, connection_class=Connection, max_connections=None,
**connection_kwargs): #类初始化时调用构造函数
max_connections = max_connections or **
if not isinstance(max_connections, (int, long)) or max_connections < : #判断输入的max_connections是否合法
raise ValueError('"max_connections" must be a positive integer')
self.connection_class = connection_class #设置对应的参数
self.connection_kwargs = connection_kwargs
self.max_connections = max_connections
self.reset() #初始化ConnectionPool 时的reset操作
def reset(self):
self.pid = os.getpid()
self._created_connections = #已经创建的连接的计数器
self._available_connections = [] #声明一个空的数组,用来存放可用的连接
self._in_use_connections = set() #声明一个空的集合,用来存放已经在用的连接
self._check_lock = threading.Lock()
.......
def get_connection(self, command_name, *keys, **options): #在连接池中获取连接的方法
"Get a connection from the pool"
self._checkpid()
try:
connection = self._available_connections.pop() #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组,
会直接调用make_connection方法
except IndexError:
connection = self.make_connection()
self._in_use_connections.add(connection) #向代表正在使用的连接的集合中添加元素
return connection
def make_connection(self): #在_available_connections数组为空时获取连接调用的方法
"Create a new connection"
if self._created_connections >= self.max_connections: #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化
raise ConnectionError("Too many connections")
self._created_connections += #把代表已经创建的连接的数值+
return self.connection_class(**self.connection_kwargs) #返回有效的连接,默认为Connection(**self.connection_kwargs)
def release(self, connection): #释放连接,链接并没有断开,只是存在链接池中
"Releases the connection back to the pool"
self._checkpid()
if connection.pid != self.pid:
return
self._in_use_connections.remove(connection) #从集合中删除元素
self._available_connections.append(connection) #并添加到_available_connections 的数组中
def disconnect(self): #断开所有连接池中的链接
"Disconnects all connections in the pool"
all_conns = chain(self._available_connections,
self._in_use_connections)
for connection in all_conns:
connection.disconnect()

3、管道

redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import redis
pool = redis.ConnectionPool(host='192.168.17.128', port=6379)#建立连接,用pool来管理避免断开
r = redis.Redis(connection_pool=pool)#redis共享连接池
# pipe = r.pipeline(transaction=False)
pipe = r.pipeline(transaction=True) #连接池申请连接
r.set('name', 'wulaoer') #插入键值对
r.set('age', 18)
pipe.execute()#申请断开

4、发布订阅

#!/usr/bin/env python
# -*- coding:utf- -*- import redis class RedisHelper: def __init__(self):
self.__conn = redis.Redis(host='192.168.17.128')#连接redis服务器
self.chan_sub = 'fm104.5'#定义值
self.chan_pub = 'fm104.5'#定义值 def public(self, msg):#定义发布函数
self.__conn.publish(self.chan_pub, msg)
return True def subscribe(self):#定义接收函数
pub = self.__conn.pubsub()
pub.subscribe(self.chan_sub)
pub.parse_response()
return pub

订阅者:

#!/usr/bin/env python
# -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper#导入连接 obj = RedisHelper()#定义类
redis_sub = obj.subscribe()#调用类中的接收函数 while True:
msg= redis_sub.parse_response()#循环接收,如果存在值,就打印出来
print msg

发布者:

#!/usr/bin/env python
# -*- coding:utf-8 -*- from monitor.RedisHelper import RedisHelper#导入连接 obj = RedisHelper()#定义类
obj.public('hello')#调用类中发布函数

更多参见:https://github.com/andymccurdy/redis-py/

     http://www.jb51.net/article/64646.htm

三、RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

RabbitMQ安装

安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 安装erlang
$ yum -y install erlang#如果你是使用ubuntu的话就用sudo spt-get install erlang 安装RabbitMQ
$ yum -y install rabbitmq-server#如果你是使用ubuntu的话就用sudo apt-get install rabbitmq-server

注意:service rabbitmq-server start/stop

安装API

pip install pika#如果你是使用ubuntu的话就用sudo pip install pika
or easy_install pika or 源码 https://pypi.python.org/pypi/pika

使用API操作RabbitMQ

基于Queue实现生产者消费者模型

#!/usr/bin/env python
# -*- coding:utf- -*-
import Queue
import threading message = Queue.Queue() def producer(i):
while True:
message.put(i) def consumer(i):
while True:
msg = message.get() for i in range():
t = threading.Thread(target=producer, args=(i,))
t.start() for i in range():
t = threading.Thread(target=consumer, args=(i,))
t.start()

对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

#!/usr/bin/env python
import pika # ######################### 生产者 ######################### connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))#连接主机
channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
#!/usr/bin/env python
import pika # ########################## 消费者 ########################## connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body) channel.basic_consume(callback,
queue='hello',
no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

1、acknowledgment 消息不丢失

no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.211.55.4'))
channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep()
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue='hello',
no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消费者

2、durable   消息不丢失

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel() # make message persistent
channel.queue_declare(queue='hello', durable=True) channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=, # make message persistent
))
print(" [x] Sent 'Hello World!'")
connection.close()
复制代码

生产者

#!/usr/bin/env python
# -*- coding:utf- -*-
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel() # make message persistent
channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep()
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue='hello',
no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消费者

3、消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

#!/usr/bin/env python
# -*- coding:utf- -*-
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel() # make message persistent
channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep()
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=) channel.basic_consume(callback,
queue='hello',
no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

消费者

4、发布订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

exchange type = fanout

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout') message = ' '.join(sys.argv[:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()

发布者

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue channel.queue_bind(exchange='logs',
queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

订阅者

5、关键字发送

exchange type = direct

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = sys.argv[:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[])
sys.exit() for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

消费者

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') severity = sys.argv[] if len(sys.argv) > else 'info'
message = ' '.join(sys.argv[:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

生产者

6、模糊匹配

exchange type = topic

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词
  • *  表示只能匹配 一个 单词
发送者路由值              队列中
old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配
#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[])
sys.exit() for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

消费者

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = sys.argv[] if len(sys.argv) > else 'anonymous.info'
message = ' '.join(sys.argv[:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

生产者

四、SQLAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

Python 之路:Python操作 RabbitMQ、Redis、Memcache、SQLAlchemy

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

步骤一:

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

#!/usr/bin/env python
# -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123456@192.168.17.129:3306/wulaoer)#连接数据库格式:mysql+mysqldb://用户名:数据库密码@数据库ip:数据库端口/数据库名称
engine.execute(                #在数据库laowu表中插入数据
  "INSERT INTO laowu (a, b) VALUES ('2', 'v1')"
)
engine.execute(                #在数据库表中插入数据
  "INSERT INTO laowu (a, b) VALUES (%s, %s)",
  ((555, "v1"),(666, "v1"),)
)
engine.execute(                #在数据库表中插入数据
  "INSERT INTO laowu (a, b) VALUES (%(id)s, %(name)s)",
  id=999, name="v1"
)
result = engine.execute('select * from laowu')
result.fetchall()
#!/usr/bin/env python
# -*- coding:utf- -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123456@192.168.17.129:3306/wulaoer", max_overflow=) # 事务操作
with engine.begin() as conn: #创建一个表
conn.execute("insert into table (x, y, z) values (1, 2, 3)")
conn.execute("my_special_procedure(5)") conn = engine.connect()
# 事务操作
with conn.begin():
conn.execute("some statement", {'x':, 'y':})

事务操作

注:查看数据库连接:show status like 'Threads%';

步骤二:

使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。

#!/usr/bin/env python
# -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata,    #新建一个表user,id列和name列
Column('id', Integer, primary_key=True),
Column('name', String(20)),
) color = Table('color', metadata,   #新建一个表color,id列和name列
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123456@192.168.17.129:3306/wulaoer", max_overflow=5)#连接数据库,最多有5个进程
metadata.create_all(engine)
# metadata.clear()
# metadata.remove()
#!/usr/bin/env python
# -*- coding:utf- -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata,#创建列表
Column('id', Integer, primary_key=True),
Column('name', String()),
) color = Table('color', metadata,#创建列表
Column('id', Integer, primary_key=True),
Column('name', String()),
)
engine = create_engine("mysql+mysqldb://root:123456@192.168.17.129:3306/wulaoer", max_overflow=) conn = engine.connect() # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)#增加数据
# conn.execute(user.insert(),{'id':,'name':'seven'})
# conn.close() # sql = user.insert().values(id=, name='wu')#插入数据
# conn.execute(sql)
# conn.close() sql = user.delete().where(user.c.id > )#删 # sql = user.update().values(fullname=user.c.name)#修改
# sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ])#查看
# sql = select([user.c.id, ])
# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
# sql = select([user.c.name]).order_by(user.c.name)
# sql = select([user]).group_by(user.c.name) # result = conn.execute(sql)
# print result.fetchall()
# conn.close()

增删改查

更多内容详见:

http://www.jianshu.com/p/e6bba189fcbd

http://docs.sqlalchemy.org/en/latest/core/expression_api.html

注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。

步骤三:

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

#!/usr/bin/env python
# -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123456@192.168.17.129:3306/wulaoer", max_overflow=5)#连接数据库,最大5个进程 Base = declarative_base() class User(Base):#创建一个类
__tablename__ = 'user'
id = Column(Integer, primary_key=True)
name = Column(String(50)) # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine) Session = sessionmaker(bind=engine)
session = Session() # ########## 增 ##########
# u = User(id=2, name='sb')#新建user表
# session.add(u)
# session.add_all([
# User(id=3, name='sb'),
# User(id=4, name='sb')
# ])
# session.commit() # ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()#删除id大于2的列
# session.commit() # ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({'cluster_id' : 0}).all#
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name='sb').first()#查看name是sb的第一列的内存地址
# print ret
# ret = session.query(User).filter_by(name='sb').all()#查看name是sb的所有内存地址
# print ret # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()#差name列sb,bb的id内存地址
# print ret # ret = session.query(User.name.label('name_label')).all()#查看数据name列
# print ret,type(ret)#打印name列,以及类型 # ret = session.query(User).order_by(User.id).all()#查看所有id列的内存地址
# print ret # ret = session.query(User).order_by(User.id)[1:3]#查看id2到3的内存地址
# print ret
# session.commit()  

更多功能参见文档,猛击这里下载PDF