python codis集群客户端(一) - 基于客户端daemon探活与服务列表维护

时间:2023-03-09 06:01:32
python codis集群客户端(一) - 基于客户端daemon探活与服务列表维护

在使用codis时候,我们遇到的场景是,公司提供了HA的Proxy(例如N个),但是不暴露zookeeper(也就是说没有codis后端服务列表)。

如果暴露zk的话,可以看这一篇http://www.cnblogs.com/kangoroo/p/7485760.html

要求在开发客户端api的过程中,自己进行探活&故障摘除&负载均衡。

我这里做了一个简单的实现,提供给大家参考。本实例支持使用在server或者daemon中。

我们的实现叫做pycodis。

python codis集群客户端(一) - 基于客户端daemon探活与服务列表维护

1、核心文件CodisPool.py

在这个文件里,实现了多服务端实例链接,探活与故障摘除,负载均衡策略(随机&roundrobin)。

1)探活

由于公司不暴露CodisProxy的zookeeper,我们在编写客户端程序的时候无法获取活跃的codisProxy服务列表。我们不得不自己去探活并且保存活跃的服务列表。

我们通过在初始化CodisPool实例时候,启动了后台线程_check_available_backgroud,去轮询配置的codisProxy列表,我们的探活与摘除满足以下3个要求:

a)如果在一定的阈值时间段内(默认3s),某个codisProxy始终无法提供服务,就暂时将它从codisProxy列表中摘除;

b)被摘除的codisProxy实例无法通过get_connection得到,但是在codisProxy列表中保留它,让它可以在满足条件的情况下复起;

c)当被摘除的codisProxy恢复了,就把它放到可用codisProxy列表中,这样通过get_connetion又能拿到它。

注:这里通过get_connection拿到的proxy,其实现方式是redis链接池。

2)负载均衡

我们通过get_connection这个函数在proxy列表中得到一个可用链接,那么获取可用链接的负载均衡算法是怎样的呢?

PickUp抽象类,定义了负载均衡类需要实现的方法,pick_up()。

我们实现了两种负载均衡算法:

a)RandomPickUp,随机负载均衡

b)RoundRobinPickUp,轮询负载均衡

3)单例模式的保证

为了不至于在程序运行时创建很多的链接池实例,这是违反设计初衷的,链接池就没有意义了,我们需要实现为线程安全的单例模式。

在CodisPool实现中只保证了不会额外建立太多的链接池--我们在__init__中一次性创建好了Podis实例,并且放入到proxy列表中。单例模式留待使用中去实现,最简单的方式是,在python的module中先把CodisPool的实例建立好。然后其他的线程在去访问。可以参考我们的example实例。

# -*- coding:utf-8 -*-
import abc
import uuid
import time
import logging
import traceback
import threading
import redis
from Podis import Podis logger = logging.getLogger(__name__) class CodisPool(object): def __init__(self, codis_config):
self._pool_shards = []
self._availables = []
self._connection = None
self._create_pool(codis_config)
self._check_available_backgroud() def _check_available_backgroud(self):
bg = Background(self._check_pool_shards)
bg.start() def _create_pool(self, codis_config):
address_list = codis_config.get('addrs').split(',')
for address in address_list:
host = address.split(':')[0]
port = address.split(':')[1]
self._pool_shards.append(
Podis(
redis.ConnectionPool(
host=host, port=port, db=0,
password=codis_config.get('password'),
max_connections=codis_config.get('max_connections')
)
)
)
self._availables.append(True)
if len(self._pool_shards) == 0:
logger.error('创建codis链接池失败')
raise Exception('创建codis链接池失败') def _check_pool_shards(self):
while True:
self._pool_shards_is_available() def _pool_shards_is_available(self, retry_num=3):
i = 0
for pool in self._pool_shards:
try:
retry = retry_num
go_on = True
while go_on and retry > 0:
try:
pong = pool.ping()
if not pong:
retry -= 1
else:
go_on = False
except Exception, ex:
retry -= 1
raise
finally:
time.sleep(1)
if retry <= 0:
self._availables[i] = False
else:
self._availables[i] = True
except Exception, ex:
logger.error(traceback.format_exc())
finally:
i += 1 def _get_available_shards(self):
i = 0
available_shards = []
for shard in self._pool_shards:
if self._availables[i]:
available_shards.append(shard)
i += 1
return available_shards def get_connection(self, pick_up=None):
if isinstance(pick_up, PickUp):
codisPool = pick_up.pick_up(self._get_available_shards())
else:
pick_up = RandomPickUp()
codisPool = pick_up.pick_up(self._get_available_shards())
return codisPool def get_pool_shards(self):
return self._pool_shards def get_availables(self):
return self._get_available_shards() class Background(object):
def __init__(self, target, daemon=True):
self.daemon = daemon
self.thread = threading.Thread(target=target) def start(self):
self.thread.setDaemon(self.daemon)
self.thread.start() class PickUp(object): __metaclass__ = abc.ABCMeta @abc.abstractmethod
def __init__(self):
pass @abc.abstractmethod
def pick_up(self, pool_list):
return class RandomPickUp(PickUp):
def __init__(self):
PickUp.__init__(self) def pick_up(self, pool_list):
pool_size = len(pool_list)
index = abs(hash(uuid.uuid4())) % pool_size
pool = pool_list[index]
print "RandomPickUp, 拿到第", index, "个pool"
return pool class RoundRobinPickUp(PickUp): def __init__(self):
PickUp.__init__(self)
self.index = 0
self.round_robin_lock = threading.Lock() def pick_up(self, pool_list):
with self.round_robin_lock:
pool_size = len(pool_list)
self.index += 1
index = abs(self.index) % pool_size
pool = pool_list[index]
print "RoundRobinPickUp, 拿到第", index, "个pool"
return pool

2、Podis,实际的句柄资源

在CodisPool获得的句柄就是本类的一个实例。

# -*- coding:utf-8 -*-
import redis
import logging
import traceback logger = logging.getLogger(__name__) def redis_getter(func):
def wrapper(*args, **kwargs):
try:
result = func(*args, **kwargs)
return result or None
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper def redis_setter(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
return True
except Exception, ex:
logger.error(traceback.format_exc())
raise
return wrapper class Podis(object): def __init__(self, pool):
self._connection = redis.StrictRedis(connection_pool=pool) @redis_getter
def ping(self):
return self._connection.ping() @redis_getter
def get(self, key):
return self._connection.get(key) @redis_setter
def set(self, key, value):
self._connection.set(key, value) @redis_setter
def lpush(self, key, *value):
self._connection.lpush(key, *value) @redis_getter
def lpop(self, key):
return self._connection.lpop(key) @redis_getter
def lrange(self, key, start, end):
return self._connection.lrange(key, start, end) @redis_setter
def sadd(self, key, *value):
self._connection.sadd(key, *value) @redis_setter
def srem(self, key, *value):
self._connection.srem(key, *value) @redis_getter
def zrange(self,key,start,end):
return self._connection.zrange(key,start,end) @redis_getter
def zrevrange(self,key,start,end):
return self._connection.zrevrange(key,start,end,withscores=True) @redis_getter
def zscore(self,key,*value):
return self._connection.zscore(key,value) @redis_setter
def zadd(self,key,score,*value):
self._connection.zadd(key,score,value) @redis_getter
def smembers(self, key):
return self._connection.smembers(key) @redis_getter
def hgetall(self, key):
return self._connection.hgetall(key) @redis_getter
def hget(self, key, name):
return self._connection.hget(key, name) @redis_getter
def hkeys(self, key):
return self._connection.hkeys(key) @redis_setter
def hset(self, key, name, value):
self._connection.hset(key, name, value) @redis_setter
def hmset(self, name, mapping):
self._connection.hmset(name, mapping) @redis_setter
def hdel(self, key, name):
self._connection.hdel(key, name) @redis_setter
def delete(self, *key):
self._connection.delete(*key) # codis不支持
@redis_getter
def keys(self, pattern):
return self._connection.keys(pattern) @redis_setter
def expire(self, key, time):
return self._connection.expire(key, time) @redis_getter
def ttl(self, key):
return self._connection.ttl(key)

3、配置文件CodisConfig.py

这里我没有对最大连接数,超时时间等等做配置,你可以根据你的场景自行添加。

codis_config = {
'addrs': '100.90.186.47:3000,100.90.187.33:3000'
}

4、单测和使用

我们模拟在并发场景下,对资源的获得和释放情况。

import time
import threading
from pycodis.CodisConfig import codis_config
from pycodis.CodisPool import CodisPool, RoundRobinPickUp codis_pool1 = CodisPool(codis_config)
print '------1-------'
pick_up1 = RoundRobinPickUp()
print '------2-------'
codis_pool2 = CodisPool(codis_config)
print '------3-------'
pick_up2 = RoundRobinPickUp()
print '------4-------' def func(i):
for i in range(10):
podis1 = codis_pool1.get_connection(pick_up=pick_up1)
podis2 = codis_pool2.get_connection(pick_up=pick_up2)
podis1.delete(i)
podis2.delete(i)
time.sleep(1) thread_list = []
for i in range(100):
thread_list.append(threading.Thread(target=func, args=[i])) for thread in thread_list:
thread.setDaemon(True)
thread.start() time.sleep(10)

可以看到打印的信息,每次都不一样

-------------
-------------
-------------
-------------
RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 个pool
个pool
RoundRobinPickUp, 拿到第 个poolRoundRobinPickUp, 拿到第
个pool
RoundRobinPickUp, 拿到第 1RoundRobinPickUp, 拿到第 个pool
个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第0 个pool
个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第个pool
1RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第 个pool个pool RoundRobinPickUp, 拿到第 1RoundRobinPickUp, 拿到第 个pool
个poolRoundRobinPickUp, 拿到第
0RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第 个pool
个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 个pool
个pool
RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 个pool个pool RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第 个pool1
RoundRobinPickUp, 拿到第个pool
个poolRoundRobinPickUp, 拿到第
RoundRobinPickUp, 拿到第 0个pool
RoundRobinPickUp, 拿到第 个pool1
个poolRoundRobinPickUp, 拿到第
RoundRobinPickUp, 拿到第 个pool 个pool RoundRobinPickUp, 拿到第 0RoundRobinPickUp, 拿到第 个pool
个poolRoundRobinPickUp, 拿到第 个pool RoundRobinPickUp, 拿到第 RoundRobinPickUp, 拿到第 个pool
个pool
RoundRobinPickUp, 拿到第RoundRobinPickUp, 拿到第 个pool
个poolRoundRobinPickUp, 拿到第
个pool
RoundRobinPickUp, 拿到第RoundRobinPickUp, 拿到第 个pool
RoundRobinPickUp, 拿到第 个pool个pool