I/O多路复用之epoll实战

时间:2023-03-10 01:48:14
I/O多路复用之epoll实战

概念

IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程

通俗理解(摘自网上一大神)

这些名词比较绕口,理解涵义就好。一个epoll场景:一个酒吧服务员(一个线程),前面趴了一群醉汉,突然一个吼一声“倒酒”(事件),你小跑过去给他倒一杯,然后随他去吧,突然又一个要倒酒,你又过去倒上,就这样一个服务员服务好多人,有时没人喝酒,服务员处于空闲状态,可以干点别的玩玩手机。至于epoll与select,poll的区别在于后两者的场景中醉汉不说话,你要挨个问要不要酒,没时间玩手机了。io多路复用大概就是指这几个醉汉共用一个服务员。

三个函数

1、select

进程指定内核监听哪些文件描述符(最多监听1024个fd)的哪些事件,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。

当我们调用select()时:

  1 上下文切换转换为内核态

  2 将fd从用户空间复制到内核空间

  3  内核遍历所有fd,查看其对应事件是否发生

  4  如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历

  5 返回遍历后的fd

  6  将fd从内核空间复制到用户空间

fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])

参数: 可接受四个参数(前三个必须)
rlist: wait until ready for reading
wlist: wait until ready for writing
xlist: wait for an “exceptional condition”
timeout: 超时时间 返回值:三个列表 select方法用来监视文件描述符(当文件描述符条件不满足时,select会阻塞),当某个文件描述符状态改变后,会返回三个列表
1、当参数1 序列中的fd满足“可读”条件时,则获取发生变化的fd并添加到fd_r_list中
2、当参数2 序列中含有fd时,则将该序列中所有的fd添加到 fd_w_list中
3、当参数3 序列中的fd发生错误时,则将该发生错误的fd添加到 fd_e_list中
4、当超时时间为空,则select会一直阻塞,直到监听的句柄发生变化
当超时时间 = n(正整数)时,那么如果监听的句柄均无任何变化,则select会阻塞n秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

在服务端我们可以看到,我们需要不停的调用select, 这就意味着:

  1  当文件描述符过多时,文件描述符在用户空间与内核空间进行copy会很费时

  2  当文件描述符过多时,内核对文件描述符的遍历也很浪费时间

  3  select最大仅仅支持1024个文件描述符

参考:http://www.cnblogs.com/Anker/archive/2013/08/14/3258674.html

2、poll

参考:http://www.cnblogs.com/Anker/archive/2013/08/15/3261006.html

3、epoll

参考:http://www.cnblogs.com/Anker/archive/2013/08/17/3263780.html

epoll是select和poll改进后的结果,相比下epoll具有以下优点:

1、支持一个进程打开的socket描述符(FD)不受限制(仅受限于操作系统的最大文件句柄数)

select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设置,默认值是1024,epoll并没有这个限制,它所支持的FD上限是操作系统的最大文件句柄数,这个数字远远大于1024

2、I/O效率不会随着FD数目的增加而线性下降

 epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时,会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝一次

传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时或者链路空闲,任一时刻只有少部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作-这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的,那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态socket则不会。在这点上,epoll实现了一个伪AIO

3、使用mmap加速内核与用户空间的消息传递

epoll会在epoll_ctl时把指定的fd遍历一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd

无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存复制就显得非常重要,epoll是通过内核和用户空间mmap使用同一块内存实现。

4、epoll的API更加简单

用来克服select/poll缺点的方法不只有epoll,epoll只是一种Linux的实现方案。在freeBSD下有kqueue,而dev/poll是最古老的Solaris的方案,使用难度依次递增。但epoll更加简单。

epoll详解(python中)

Python中的select模块专注于I/O多路复用,提供了select  poll  epoll三个方法(其中后两个在Linux中可用,windows仅支持select),另外也提供了kqueue方法(freeBSD系统)

select.epoll(sizehint=-1, flags=0) 创建epoll对象

epoll.close()
Close the control file descriptor of the epoll object.关闭epoll对象的文件描述符 epoll.closed
True if the epoll object is closed.检测epoll对象是否关闭 epoll.fileno()
Return the file descriptor number of the control fd.返回epoll对象的文件描述符 epoll.fromfd(fd)
Create an epoll object from a given file descriptor.根据指定的fd创建epoll对象 epoll.register(fd[, eventmask])
Register a fd descriptor with the epoll object.向epoll对象中注册fd和对应的事件 epoll.modify(fd, eventmask)
Modify a registered file descriptor.修改fd的事件 epoll.unregister(fd)
Remove a registered file descriptor from the epoll object.取消注册 epoll.poll(timeout=-1, maxevents=-1)
Wait for events. timeout in seconds (float)阻塞,直到注册的fd事件发生,会返回一个dict,格式为:{(fd1,event1),(fd2,event2),……(fdn,eventn)}

事件:

    EPOLLERR = 8               ----发生错误
EPOLLET = 2147483648 ----默认为水平触发,设置该事件后则边缘触发
EPOLLHUP = 16 ----挂起状态
EPOLLIN = 1 ----可读
EPOLLMSG = 1024 ----忽略
EPOLLONESHOT = 1073741824 ----一次性行为。在退出一个事件后,FD内部禁用
EPOLLOUT = 4 ----可写
EPOLLPRI = 2 ----紧急可读
EPOLLRDBAND = 128 ----读取优先
EPOLLRDNORM = 64 ----相当于epollin
EPOLLWRBAND = 512 ----写入优先
EPOLLWRNORM = 256 ----相当于epollout

水平触发和边缘触发:

Level_triggered(水平触发,有时也称条件触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据一次性全部读写完(如读写缓冲区太小),那么下次调用 epoll.poll()时,它还会通知你在上没读写完的文件描述符上继续读写,当然如果你一直不去读写,它会一直通知你!!!如果系统中有大量你不需要读写的就绪文件描述符,而它们每次都会返回,这样会大大降低处理程序检索自己关心的就绪文件描述符的效率!!! 优点很明显:稳定可靠

Edge_triggered(边缘触发,有时也称状态触发):当被监控的文件描述符上有可读写事件发生时,epoll.poll()会通知处理程序去读写。如果这次没有把数据全部读写完(如读写缓冲区太小),那么下次调用epoll.poll()时,它不会通知你,也就是它只会通知你一次,直到该文件描述符上出现第二次可读写事件才会通知你!!!这种模式比水平触发效率高,系统不会充斥大量你不关心的就绪文件描述符!!!缺点:某些条件下不可靠

#!/usr/bin/python
# coding:utf-8 import select,socket
import time EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!' sk = socket.socket()
sk.bind(('192.168.110.100',8080))
sk.listen(5)
sk.setblocking(0) #设置非阻塞模式 epoll = select.epoll() #建立一个epoll对象
epoll.register(sk.fileno(),select.EPOLLIN) #监听sk文件描述符的读事件(连接过来,产生读事件)
try:
connections = {}; requests = {}; responses = {}
while True:
events = epoll.poll() #关注是否有关心的事发生
for fileno,event in events: # 返回的events是一个(fileno, event code)tuple列表. fileno是文件描述符, 是一个整型数.
if fileno == sk.fileno(): #如果是服务器socket事件(即有新连接),建立一个新 连接
connection, address = sk.accept() #建立的新连接
connection.setblocking(0) #设置socket为非阻塞模式.
epoll.register(connection.fileno(), select.EPOLLIN) # 注册socket的read(EPOLLIN)事件
connections[connection.fileno()] = connection # 保存文件描述符
requests[connection.fileno()] = b'' #发送过来的内容
responses[connection.fileno()] = response # 要发送的内容 elif event & select.EPOLLIN: #如果读事件发生
requests[fileno] += connections[fileno].recv(1024)# 从客户端读取信息
if EOL1 in requests[fileno] or EOL2 in requests[fileno]: #表示信息接收完毕,结束标志
epoll.modify(fileno, select.EPOLLOUT)#一旦完整的http请求接收到,取消注册读取事件,注册写入事件(EPOLLOUT), 写入事件在能够发送数据回客户端的时候产生
print('-'*40 + '\n' + requests[fileno].decode()[:-2]) elif event & select.EPOLLOUT: #如果写入事件发生在一个客户端socket上面, 我们就可以发送新数据到客户端了.
byteswritten = connections[fileno].send(responses[fileno]) #发送数据到客户端,并返回发送的字节个数
responses[fileno] = responses[fileno][byteswritten:] #对字符串进行切片操作,如果完全切,表面发送完毕
if len(responses[fileno]) == 0: #表明数据发送完毕
epoll.modify(fileno, 0) #一旦所有的返回数据都发送完, 取消监听读取和写入事件.
connections[fileno].shutdown(socket.SHUT_RDWR) elif event & select.EPOLLHUP: #表示客户端断开连接
epoll.unregister(fileno) #取消注册
connections[fileno].close() #断开连接.
del connections[fileno] #销毁对象 finally:
epoll.unregister(sk.fileno())
epoll.close()
serversocket.close()

服务端

#!/usr/bin/python
# coding:utf-8 import socket
obj = socket.socket()
obj.connect(('192.168.110.100',8080))
obj.sendall('hellob\n\r\n')
print obj.recv(1024)
obj.close()

客户端

实战代码:

# /usr/bin/python
# coding:utf-8 import select
import socket
import sys
import Queue
import time
import threading
import logging
import datetime
import re, os
import hashlib sys.path.append('../') import multiprocessing from SQLdb import SQLdb
from mylog import MyLog as Log from communication_packet import Communication_Packet, Communication_Packet_Flags, Error_Info_Flags
from encryption import PrpCrypt
import pdb '''
Constant Meaning
EPOLLIN Available for read
EPOLLOUT Available for write
EPOLLPRI Urgent data for read
EPOLLERR Error condition happened on the assoc. fd
EPOLLHUP Hang up happened on the assoc. fd
EPOLLET Set Edge Trigger behavior, the default is Level Trigger behavior
EPOLLONESHOT Set one-shot behavior. After one event is pulled out, the fd is internally disabled
EPOLLRDNORM Equivalent to EPOLLIN
EPOLLRDBAND Priority data band can be read.
EPOLLWRNORM Equivalent to EPOLLOUT
EPOLLWRBAND Priority data may be written.
EPOLLMSG Ignored. ''' class Server(object):
def __init__(self, server_IP=None, server_port=None):
# def __init__(self,server_address = ('112.33.9.154',11366)):
'''
初始化服务器一些全局数据
'''
# pdb.set_trace() # 使用默认模式:debug模式
self.log = Log()
self.log.openConsole() # 打开控制端输出
self.logger = self.log.getLog()
self.dbname = 'sync_test' # Defaut, we use local host IP and port:11366
if server_IP is None or server_port is None:
if server_IP is None:
try:
self.server_IP = self.getlocalIP()
self.logger.info('Current server_IP: %s' % self.server_IP)
except:
self.logger.critical('Get server IP Error!')
raise if server_port is None:
self.server_port = 11366
else:
self.server_IP = server_IP
self.server_port = server_port self.server_address = (self.server_IP, self.server_port) # 设置server地址
self.ListenNum = 100 # 设置最大监控soket连接数
self.connections = {} # 记录当前连接
self.requests = {} # 记录当前连接的请求数据
self.addresses = {} # 记录客户端地址
self.errorInfo = {} # 记录错误信息,如果出错则把错误信息返回给客户端
self.responseInfo = {}
self.readthreadRecord = {} self.lock = threading.Lock() # 构造线程锁用于数据同步
self.db = SQLdb() # 初始化数据库用于数据库同步
self.setDB('localhost', 'root', '', 'sync_test') self.readthreadlock = threading.Lock() self.EOF = '\n\r\n'
self.servernum = 'serverxxxxx'
self.key = '91keytest'
# set communication user id
Communication_Packet.set_userid(self.servernum)
self.encryption = PrpCrypt()
pass def setServerAddree(self, server_ip, server_port):
'''
Set server address
'''
self.server_address = (server_ip, server_port) # 设置server地址 def setDB(self, host=None, username=None, password=None, dbname=None):
self.db = SQLdb(host, username, password, dbname) def getlocalIP(self):
'''
获取第一块网卡i做为绑定IP
'''
try:
s = os.popen('ifconfig').read()
except:
raise
else:
ip = re.findall('inet addr:(?<![\.\d])(?:\d{1,3}\.){3}\d{1,3}(?![\.\d])', s)[0].split(':')[1]
return ip def __init_server(self):
'''
初始化server以及epoll监控对象
'''
try:
# pdb.set_trace()
# Create a TCP/IP socket
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 设置socket为非阻塞状态
self.server.setblocking(0) # Bind the socket to the port
self.logger.info('starting up on %s port %s' % self.server_address)
try:
self.server.bind(self.server_address)
except:
echo = os.popen('''lsof -i :11366 |grep "(LISTEN)" | awk '{printf($1)}' ''').read()
print '端口%s被%s进程占用!' % (self.server_port, echo)
self.logger.error('Bind on %s port %s Error!' % self.server_address)
raise # Listen for incoming connections
# self.server.listen(self.ListenNum)
self.server.listen(1) # Set up the epoll
self.epoll = select.epoll()
self.epoll.register(self.server.fileno(), select.EPOLLIN | select.EPOLLET) # 为server注册读事件,并设置其为边缘出发模式
except:
raise Exception('__init_server Error') def __jionthreads(self):
'''
join the threading
'''
# self.logger.debug('Current threadtast is %d ' % len(threading.enumerate()))
main_thread = threading.currentThread()
for t in threading.enumerate():
if t is main_thread:
continue
else:
t.join(0) # 非阻塞join # self.logger.debug('After joined the threads... %d '% len(threading.enumerate())) def __format_str(self, businessFlag, data, endFlag=True, errorFlag=False, hasnewconf=False, versioninfo=''):
'''
格式化发送数据
'''
formatstr = {'BUSINESS_TYPE': businessFlag, 'DATA': data, 'ENDFLAG': endFlag, 'ERRORFLAG': errorFlag,
'HASNWECONF': hasnewconf, 'VERSIONINFO': versioninfo}
return str(formatstr) + self.EOF def get_table_filed(self, table_name, db, lock, db_name):
# pdb.set_trace()
# from the db get the table field!
query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s' and TABLE_SCHEMA = '%s';" % (
table_name, db_name)
with self.lock:
detection_version_fields = self.db.fechdb(query_detection_version_field_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
# Record Error and end task
DB_ERROR = False
self.logger.error('----Get %s fileds Error! End this task-----' % table_name)
return
else:
# query result is Unicode,so we need to encode to utf-8
table_field = [field[0].encode('utf-8') for field in detection_version_fields]
return table_field def calc_md5(self, data):
return hashlib.md5(data).hexdigest() def validating_message_token(self, receving_data):
# print receving_data
# pdb.set_trace()
# print receving_data
print len(receving_data)
pre_md5 = receving_data[:16]
suffix_md5 = receving_data[-16:]
message_md5 = pre_md5 + suffix_md5
message = receving_data[16:-16]
cur_md5 = self.calc_md5(message)
print cur_md5, message_md5
if message_md5 == cur_md5:
return True, message
else:
return False, message
pass def validating_content_token(self, content):
receive_content = content['Business']['Content']
if not isinstance(receive_content, str):
receive_content = str(receive_content)
receive_md5 = content['Info_Status']['Security_Token']
receive_time = content['Info_Status']['Time']
cur_md5 = self.calc_md5(receive_content + receive_time + self.key)
if cur_md5 == receive_md5:
return True
else:
return False
pass def packaging_message(self, Message=None, Error=None, Info_Status=None):
# ----Pack send message
# Init Communication_Packet
cm_packet = Communication_Packet() # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True):
cm_packet.set_content_Business(targv=Message) # def set_content_Error(self,error_flag = False,error_type = None,error_info = None):
cm_packet.set_content_Error(targv=Error) now_time = str(datetime.datetime.now()) # get current time
# Business+time+key calculate token,calculate token
security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key)
# def set_content_Info_Status(self,info_type,security_token,time,is_end):
# Need to replace the security_token
Info_Status = list(Info_Status) # 转化元组到列表
Info_Status[1] = security_token
Info_Status[2] = now_time
Info_Status = tuple(Info_Status) # 重新转化列表到元组作为参数 cm_packet.set_content_Info_Status(targv=Info_Status)
try:
send_data = cm_packet.content
except Exception, e:
raise e
else:
# we Encryption data
self.logger.debug(type(send_data))
encryption_send_data = self.encryption.encrypt(str(send_data))
# caculate md5
encrypt_send_md5 = self.calc_md5(encryption_send_data)
complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF
return complete_send_data def unpackaging_message(self, unpacking_str):
# pdb.set_trace()
if not isinstance(unpacking_str, str):
raise exceptions.ValueError
else:
unpacking_str = unpacking_str.strip(self.EOF)
flag, message = self.validating_message_token(unpacking_str)
if flag:
decrypt_str = self.encryption.decrypt(message)
try:
message_dict = eval(decrypt_str)
except Exception, e:
self.logger.error('Eval decrypt_str Error!')
raise e
else:
if self.validating_content_token(message_dict):
return message_dict
else:
self.logger.error('Message is tampered!')
return None
pass
else:
self.logger.error('Message is tampered!')
return None def init_detection_nums(self):
# get detect_point_nums from server db
# pdb.set_trace()
query_sql = "select distinct(sync_point_no) from sync_control"
BD_ERROR = False
with self.lock:
point_nums = self.db.fechdb(query_sql)
BD_ERROR = self.db.Error
if BD_ERROR:
self.logger.error('-----get detect_point_nums error-----') if point_nums:
nums_list = [p[0] for p in point_nums]
return nums_list
else:
return None def verification_sync_point_no(self, detection_num):
detect_point_nums = self.init_detection_nums()
if not detect_point_nums:
self.logger.error('-----db config error!-----')
raise
if detection_num in detect_point_nums:
return True
else:
return False def read_sync_contol_configure(self, table_name, sync_type, sync_point_no, sync_user='server'):
# Read the control configuration from loacal db,if have not,we sync it from server,then read it again
# pdb.set_trace()
qeury_sql = "select * from sync_control where sync_table = '%s' and sync_type = '%s' and sync_user = '%s' and sync_point_no = '%s';" % (
table_name, sync_type, sync_user, sync_point_no)
DB_ERROR = False # set table name
sync_table_name = 'sync_control' # get `sync_control` table fields
table_field = self.get_table_filed(sync_table_name, self.lock, self.db, self.dbname)
if not table_field:
self.logger.error('----------' % table_name)
return None with self.lock:
control_configure = self.db.fechdb(qeury_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
self.logger.error('-----Get control configure Error!-----')
return None
if control_configure:
# Get the configure from db! and On the basis of classification of table name and sync type(uploat or download)
# format the configure to a list
control_configure_list = []
for iter_conf in control_configure:
control_configure_item = dict.fromkeys(table_field)
lenth = len(table_field)
# set value for everyone key
for i in range(lenth):
val = iter_conf[i]
if isinstance(val, unicode):
val = val.encode('utf-8')
control_configure_item[table_field[i]] = val
control_configure_list.append(control_configure_item)
return control_configure_list def parsing_config(self, config):
# parsing the config to get the sql
# may modify the logic of the code
# pdb.set_trace()
p_conf = dict()
table_name = config['sync_table'] # Get the download table name
p_conf['table_name'] = table_name
table_field = config['sync_field'] # Get sync table field!
p_conf['sync_type'] = config['sync_type']
p_conf['sync_range'] = config['sync_range']
p_conf['sync_range_value'] = config['sync_range_value']
p_conf['sync_is_update_time'] = config['sync_is_update_time'] # if table_field is null,we need sync all the field!
if not table_field:
table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname)
if not table_field:
self.logger.error(
'-----Terminate this task,becase of getting the %s table fileds fialed!-----' % table_name)
return
p_conf['table_field'] = table_field
# Get this operation's type
try:
sql_operations = eval(config['sync_operation_type'])
except Exception, e:
self.logger.error('-----get sync_operation_type error!-----')
return upside_operate = sql_operations['upside'] # if have, this download operation need carry db info to the server!
p_conf['upside_operate'] = upside_operate
downside_operate = sql_operations['downside'] # how to handle the downloaded db info!
p_conf['downside_operate'] = downside_operate
update_state_operate = sql_operations['update_state']
p_conf['update_state_operate'] = update_state_operate # Get the sync sql of the corresponding operation
try:
sqls = eval(config['sync_sql'])
except Exception, e:
self.logger.error('-----get sync_sql error!-----')
raise upside_sqls = sqls['upside'] # a tuple or None
p_conf['upside_sqls'] = upside_sqls downside_sqls = sqls['downside'] # a tuple or None
p_conf['downside_sqls'] = downside_sqls update_state_sqls = sqls['update_state'] # a tuple or None
p_conf['update_state_sqls'] = update_state_sqls # Get the sync patch field of the corresponding operation
try:
if config['sync_patch_field']:
patch_fields = eval(config['sync_patch_field'])
else:
pass
except Exception, e:
self.logger.error('-----get sync_field error!-----')
raise upside_fields = patch_fields['upside'] # a tuple or None
p_conf['upside_fields'] = upside_fields downside_fields = patch_fields['downside'] # a tuple or None
p_conf['downside_fields'] = downside_fields update_state_fields = patch_fields['update_state'] # a tuple or None
p_conf['update_state_fields'] = update_state_fields # Get the sync_field_value of the corresponding operation
try:
if config['sync_patch_field_value']:
patch_field_values = eval(config['sync_patch_field_value'])
else:
pass
except Exception, e:
self.logger.error('-----get sync_field_value error!-----')
return upside_patch_field_values = patch_field_values['upside'] # a tuple or None
p_conf['upside_patch_field_values'] = upside_patch_field_values downside_patch_field_values = patch_field_values['downside'] # a tuple or None
p_conf['downside_patch_field_values'] = downside_patch_field_values update_state_patch_field_values = patch_field_values['update_state'] # a tuple or None
p_conf['update_state_patch_field_values'] = update_state_patch_field_values is_carry_state = config['sync_is_carry_state']
p_conf['is_carry_state'] = is_carry_state is_update_state = config['sync_is_update_state']
p_conf['is_update_state'] = is_update_state is_use_state_carry_data = config['sync_is_use_state_carry_data']
p_conf['is_use_state_carry_data'] = is_use_state_carry_data return p_conf def __proxy(self, FileNo):
# 启动线程处理写事件
newthread = threading.Thread(target=self.__handler_write_event, args=(FileNo,))
# newthread.daemon = True
newthread.start() def __delevnet(self, FileNo):
'''
注销不再关注的事件以及删除相关的资源
'''
self.logger.info('Start to unregister and close %s socket... ' % FileNo)
self.epoll.unregister(FileNo)
self.connections[FileNo].close()
del self.connections[FileNo]
del self.requests[FileNo]
del self.addresses[FileNo]
del self.errorInfo[FileNo]
self.logger.info('unregistered and closed the %s socket! ' % FileNo) # def __read_from_socket(self,FileNo):
# '''
# Read data from socket
# '''
# if self.requests[FileNo]:
# return False
# else:
# try:
# while True:
# tmpdata = self.connections[FileNo].recv(4096)
# if not tmpdata:
# return True
# self.requests[FileNo] += tmpdata
# self.logger.debug(len(tmpdata))
# #print tmpdata
# except socket.error:
# return True
# except Exception,e:
# raise e
def __read_from_socket(self, FileNo):
'''
Read data from socket
'''
try:
while True:
tmpdata = self.connections[FileNo].recv(4096)
# print 'tmpdata: %s' % tmpdata
# 因为python没有EPOLLRDHUP,而客户端主动关闭或者没有发送数据前ctr+c
# 服务器触发的是EPOLLIN事件,而从socket里面读取到的数据为空...没有找到其他解决方案!
if not tmpdata:
break
self.requests[FileNo] += tmpdata
# self.logger.debug(len(tmpdata))
except socket.error:
pass
except Exception, e:
raise e # error_flag = False,error_type = None,error_info = None
def __deal_business(self, FileNo):
# 根据接受到的数据处理客户端业务
# pdb.set_trace()
try:
message = self.unpackaging_message(self.requests[FileNo])
# we need reset the requests info
self.requests[FileNo] = ''
# self.logger.debug(message)
except:
# 需要设置错误标志并却设置错误信息
self.logger.error('unpackaging_message Error!')
self.errorInfo[FileNo] = (Error_Info_Flags.Receive_Data_Error, 'Server recieved data Error!')
return
else:
if message:
business = message['Business']
client_id = message['userid']
error_info = message['Error']
info_states = message['Info_Status']
verification_result = self.verification_sync_point_no(client_id)
if verification_result:
# Here we handle the business
# 1、get config from the db
# read_sync_contol_configure(self,table_name,sync_type,sync_point_no,sync_user):
try:
table_name = business['Table_Name']
sync_type = business['Type']
is_sync_flag = business['Is_Syncdb']
except:
self.errorInfo[FileNo] = (
Error_Info_Flags.Receive_Data_Error, 'Business information is incomplete!')
return
if sync_type == Communication_Packet_Flags.DOLOAD_DB:
s_type = 'download'
else:
s_type = 'upload'
b_config = self.read_sync_contol_configure(table_name, s_type, client_id)
# pdb.set_trace()
if b_config:
p_config = [self.parsing_config(conf) for conf in b_config]
self.logger.debug(p_config)
else:
pass
if b_config:
self.real_business_processing_functions(FileNo, p_config, business, info_states, error_info)
else:
# set error info!
self.errorInfo[FileNo] = (
Error_Info_Flags.Server_config_Error, 'Server config is None, give up this task!') else:
# 用户信息认证失败
self.logger.error('-----User authentication failed! userid: %s-----' % client_id)
self.errorInfo[FileNo] = (Error_Info_Flags.User_Certification_Error, 'User authentication failed!')
else:
# if no message,it means information authenticationfailed!
self.logger.error('-----Clinet\'s Information authentication failed!-----')
self.errorInfo[FileNo] = (
Error_Info_Flags.Info_Certification_Error, 'Information authentication failed!') def calculate_time(self, time_type, time_value):
# maybe is str,we need to convert it to int type
time_value = int(time_value) # get current time as the end time
cur_time = datetime.datetime.now()
hours = 0
if time_type == 'hour':
hours = time_value * 24
elif time_type == 'day':
hours = time_value * 24
elif time_type == 'week':
hours = time_value * 24 * 7
elif time_type == 'month':
hours = time_value * 24 * 30
else:
self.logger.error('-----time_type Error!-----')
return None
# caculate the start time
start_time = cur_time - datetime.timedelta(hours=hours)
return (start_time, cur_time) # handle the bussiness from the client
def real_business_processing_functions(self, FileNo, business_config, business, info_states, error_info):
# pdb.set_trace()
# according to the config we handle the business
business_config = business_config[0]
if info_states['Info_Type'] == Communication_Packet_Flags.REQEST:
# get bussiness type
request_bussiness_type = business['Type']
if request_bussiness_type == Communication_Packet_Flags.UPLOAD_DB:
request_bussiness_type = 'upload'
elif request_bussiness_type == Communication_Packet_Flags.DOLOAD_DB:
request_bussiness_type = 'download'
else:
self.errorInfo[FileNo] = (
Error_Info_Flags.Client_Data_Pack_Error, 'Request business type error %s' % request_bussiness_type)
return loc_config_sync_type = business_config['sync_type']
if request_bussiness_type == loc_config_sync_type: is_carry_state = business_config['is_carry_state']
is_use_state_carry_data = business_config['is_use_state_carry_data']
is_update_state = business_config['is_update_state']
# handle the download request
if request_bussiness_type == 'download':
# parsing the loacal config
up_sql_list = []
upside_operates = business_config['upside_operate'].split('|')
upside_sqls = business_config['upside_sqls']
upside_fields = business_config['upside_fields']
upside_patch_field_values = business_config['upside_patch_field_values']
sync_range = business_config['sync_range']
sync_range_value = business_config['sync_range_value'] lenth = len(upside_sqls)
for i in range(lenth):
sql_part = upside_sqls[i] # if sync_range is not None,we will ignore the other
if sync_range:
if sync_range == 'period':
t_type, t_value = sync_range_value.split(':')
s_time, e_time = self.calculate_time(t_type, t_value)
qeury_sql = sql_part % (str(s_time), str(e_time))
else:
qeury_sql = sql_part
# add it into the list
up_sql_list.append(qeury_sql)
else:
# we need parsing other configurations
if is_use_state_carry_data:
try:
# [((u'update_time', u'1970-01-01 00:00:00'),)] limk this
qeury_sql = sql_part % business['Content'][0][0][1]
except:
self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Content Error!')
else:
qeury_sql = sql_part
up_sql_list.append(qeury_sql)
query_data = []
for u_sql in up_sql_list:
BD_ERROR = False
with self.lock:
res = self.db.fechdb(u_sql)
BD_ERROR = self.db.Error
if BD_ERROR:
self.errorInfo[FileNo] = (
Error_Info_Flags.Server_DB_Error, 'Server db Error,SQL: %s' % u_sql)
break
else:
query_data.append(res)
self.responseInfo[FileNo] = query_data
# handle the upload request
elif request_bussiness_type == 'upload':
# pdb.set_trace()
# parsing the loacal config
content = business['Content']
try:
self.refresh_the_database(business_config, content)
except Exception, e:
print e
self.errorInfo[FileNo] = (Error_Info_Flags.Server_config_Error, 'Server Config Error!') else:
self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'bussiness type Error!')
else:
self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error,
'server config type is different from client request business type! Error!')
else:
self.errorInfo[FileNo] = (Error_Info_Flags.Client_Data_Pack_Error, 'Communication_Packet_Flags Error!') # update the db
def refresh_the_database(self, handle_config, db_content):
'''
refresh the database,maybe insert、update、delete...
'''
# parsing the handle config
table_name = handle_config['table_name']
table_field = handle_config['table_field']
downside_operate = handle_config['downside_operate']
update_state_operate = handle_config['update_state_operate']
downside_sqls = handle_config['downside_sqls']
update_state_sqls = handle_config['update_state_sqls']
downside_fields = handle_config['downside_fields']
update_state_fields = handle_config['update_state_fields']
downside_patch_field_values = handle_config['downside_patch_field_values']
update_state_patch_field_values = handle_config['update_state_patch_field_values']
is_update_time = handle_config['sync_is_update_time']
# pdb.set_trace() try:
table_field = eval(table_field)
if not table_field:
table_field = self.get_table_filed(table_name, self.lock, self.db, self.dbname)
first_field = table_field[0]
except Exception, e:
self.logger.error('-----eval table_field error,config is error!-----')
raise e
if first_field == 'id':
is_id = True
else:
is_id = False download_oprations = downside_operate.split('|')
if 'file' in download_oprations:
filename = self.createNewBlackListPath()
handle_flag = self.handle_file_func(db_content, filename)
return handle_flag
# table_field = eval(table_field)
try:
is_update_time = int(is_update_time)
except:
self.logger.error('-----is_update_time config value error!-----')
raise
for db_item in db_content:
if is_update_time:
time_index = table_field.index('update_time')
update_time = (str(datetime.datetime.today()).split('.')[0],)
db_item = db_item[:time_index] + update_time + db_item[time_index + 1:]
if is_id:
rowdata = db_item[1:]
else:
rowdata = db_item
# self.logger.debug(rowdata)
# print dict(zip(self.phishing_log_fields,rowdata))
lenth = len(download_oprations)
for oper in download_oprations:
# here we get all the patched field value
# '((fixed,true),(carry,None),(tansfer,None))',
myindex = download_oprations.index(oper)
fields_value = []
# pdb.set_trace()
for i in range(len(downside_patch_field_values[myindex])):
val = downside_patch_field_values[myindex][i]
if val[0] == 'fixed':
pass
elif val[0] == 'carry':
pass
elif val[0] == 'transfer':
field_name = downside_fields[myindex][i]
v_index = table_field.index(field_name)
tf_value = db_item[v_index]
fields_value.append(tf_value)
pass
else:
self.logger.error('-----server downside_patch_field_values Error! valuse: %s------' % str(
downside_patch_field_values))
# pdb.set_trace()
if fields_value:
d_sql, f_val = self.pre_handle_None_value(downside_sqls[myindex],
self.format_field_value(fields_value))
db_sql = self.format_sql(d_sql, f_val)
else:
db_sql = downside_sqls[myindex] # pdb.set_trace()
BD_ERROR = False
with self.lock:
if oper == 'insert':
self.db.insertdb(db_sql)
BD_ERROR = self.db.Error
if oper == 'update':
self.db.updatedb(db_sql)
BD_ERROR = self.db.Error
if oper == 'delete':
self.db.deldb(db_sql)
BD_ERROR = self.db.Error
if not BD_ERROR:
break
else:
continue
else:
return True def format_tuple(self, tup):
'''
It is None if field in DB is NULL when we get the data from db use mysqldb!
Format the None to NuLL for inserting data to DB
'''
vluelist = ['NULL' if t is None else t for t in tup]
padlist = ['%s' if t is None else '\'%s\'' for t in tup]
padstr = ''
for pl in padlist:
padstr += pl
padstr += ','
else:
padstr = padstr[:-1]
return padstr % tuple(vluelist) def format_sql(self, patch_sql, patch_field_value):
if isinstance(patch_sql, str) and isinstance(patch_field_value, tuple):
try:
res_sql = patch_sql % patch_field_value
except:
res_sql = None
return res_sql
else:
self.logger.error('-----formate_sql args type error-----')
raise exceptions.TypeError def format_field_value(self, field_value):
# we neeed hanle the ' or " in the mysql statement
res_list = list()
for val in field_value:
if isinstance(val, unicode):
val = val.encode('utf-8')
if isinstance(val, str):
f_val = val.replace('\'', '\\\'').replace('\"', '\\\"')
else:
f_val = val
res_list.append(f_val)
return tuple(res_list) def get_all_sub_str_index(self, index_str, sub_str, none_indexs):
# print index_str
index_list = []
start_index = 0
cnt = 0
while True:
try:
tmp_index = index_str.index(sub_str, start_index)
except:
break
else:
if cnt in none_indexs:
index_list.append(tmp_index)
start_index = tmp_index + len(sub_str)
cnt += 1
return tuple(index_list) def pre_handle_None_value(self, patch_sql, field_values): # get all the None value index
None_indexs = []
for i in range(len(field_values)):
if field_values[i] is None:
None_indexs.append(i) if None_indexs:
# get '%s' indexs
s_indexs = self.get_all_sub_str_index(patch_sql, "'%s'", None_indexs) str_list = list(patch_sql)
# pdb.set_trace()
subtraction_index = 0
for ix in s_indexs:
print subtraction_index
str_list.pop(ix - subtraction_index)
# print str_list[ix-subtraction_index]
str_list.pop(ix - subtraction_index + 2)
subtraction_index += 2
replace_str = ''.join(str_list)
# pdb.set_trace()
# print replace_str
# pdb.set_trace()
res_field_values = ['NULL' if f_val is None else f_val for f_val in field_values] return replace_str, tuple(res_field_values)
else:
return patch_sql, field_values def __handler_read_event(self, FileNo):
self.logger.info('Start handle the recieved data...')
# 对接受到数据做业务处理
try:
self.__deal_business(FileNo)
except Exception, e:
self.logger.error('__deal_business Exception: %s' % e)
# self.logger.debug(datetime.datetime.now())
try:
self.epoll.modify(FileNo, select.EPOLLOUT | select.EPOLLET | select.EPOLLONESHOT)
except:
pass
self.logger.error('Deal_business ERRor')
else:
self.modify_revent_to_wevent(FileNo)
self.logger.info('Handle the recieved data End!') # content = dict(sync_point=self.detect_piont_name,sync_point_no = self.detect_piont_serial_number)
# message_info = (Communication_Packet_Flags.DOLOAD_DB,str(content),True,table_name,table_field)
# error_info = (False,None,None)
# message_status = (Communication_Packet_Flags.RESPONSE,None,str(datetime.datetime.now()),True) def __handler_write_event(self, FileNo): # if errorInfo is not null,we send Error to Client else handing write business
Error_Info = None
try:
Error_Info = self.errorInfo[FileNo]
# reset error info to None
self.errorInfo[FileNo] = ''
except:
# 说明socket已从列表注销和直接退出程序
self.logger.info('This socket is removed from error info list!')
return
error_info = (False, None, None)
if Error_Info:
print Error_Info # using debug
error_info = (True, Error_Info[0], Error_Info[1]) response = self.responseInfo[FileNo]
# need reset the response info
self.responseInfo[FileNo] = '' res_info = (None, response, None, None, False, None) info_states = (Communication_Packet_Flags.RESPONSE, None, None, True) message = self.packaging_message(res_info, error_info, info_states)
self.logger.debug(message)
self.send_message(FileNo, message, True) # send the message to client self.modify_wevent_to_revent(FileNo) # modify the event def modify_wevent_to_revent(self, FileNo):
'''
If we trigger the read envet,we use this function
'''
try:
# We need modify event to read event!
self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLIN | select.EPOLLONESHOT)
except:
pass def modify_revent_to_wevent(self, FileNo):
'''
If we trigger the write envet,we use this function
'''
try:
self.epoll.modify(FileNo, select.EPOLLET | select.EPOLLOUT | select.EPOLLONESHOT)
except:
pass def send_message(self, FileNo, message, blocking=True):
# if message is big,use noblocking it will occur error!
# so, we maybe set it to blocking
if blocking:
self.connections[FileNo].setblocking(True)
if FileNo not in self.connections:
self.logger.debug('This socket not in the connections list!')
return
try:
self.connections[FileNo].sendall(message)
except Exception, e:
pass
# last we need to set it to False! we use the noblocking module
if blocking:
self.connections[FileNo].setblocking(False) def start_server(self):
try: self.__init_server() # 初始化服务器
except:
# 初始化服务器错误
self.logger.critical('Init server Error...')
raise while True:
# join the thread
self.__jionthreads() # Wait for at least one of the sockets to be ready for processing
self.logger.info('waiting for the next event')
events = self.epoll.poll() for fileno, event in events:
# Handle inputs
if fileno == self.server.fileno():
try:
while True:
connection, address = self.server.accept()
connection.setblocking(0) # 设置连接为非阻塞模式
# Here we can not use select.EPOLLONESHOT flag.This flag
self.epoll.register(connection.fileno(),
select.EPOLLIN | select.EPOLLET) # 把新来的连接同样设置为边缘出发模式
self.connections[connection.fileno()] = connection # 记录连接
self.requests[connection.fileno()] = '' # 记录业务请求
self.addresses[connection.fileno()] = address # 记录连接地址
self.errorInfo[connection.fileno()] = ''
self.responseInfo[connection.fileno()] = ''
# 设置错误信息如果为空串则是无错误信息
self.logger.info('========================================')
self.logger.info('Client %s:%s connected server' % (address))
except socket.error:
pass # elif event & (select.EPOLLIN | select.EPOLLONESHOT):
elif event & select.EPOLLIN: # Read data from socket untill data is recieved over!
self.logger.debug('EVENT EPOLLIN: %s' % hex(event))
# pdb.set_trace()
try:
r_flag = self.__read_from_socket(fileno)
except socket.error:
pass
except Exception, e:
# if we catch other Exception, it is to say that we recieved data from client Error!
# We need send error data to client!
self.logger.warning('Catch other exception when recieve data!') self.errorInfo[fileno] = (
Error_Info_Flags.Receive_Data_Error, '-----Server recieved data Error!-----')
self.modify_revent_to_wevent(fileno)
else:
# if it has no exception when eval the data, we think that client data is recieved over.
# #then start a new thread to deal with the client data
# if not r_flag:
# print '#################################'
# pass
# else:
if self.requests[fileno]:
# Start a new thread to disposal the client requests
# self.logger.debug(self.requests[fileno])
if self.requests[fileno].endswith(self.EOF):
newthread = threading.Thread(target=self.__handler_read_event, args=(fileno,))
newthread.daemon = True
newthread.start()
print 'start %s' % newthread.name
# print 'print start new thread'
else:
# 没有从客户端读取到数据,说明客户端已经关闭,主动挂断
# self.logger.info("closing %s %s (HUP)" % self.addresses[fileno])
self.__delevnet(fileno) # elif event & (select.EPOLLOUT | select.EPOLLONESHOT):
elif event & select.EPOLLOUT:
self.logger.debug('EVENT EPOLLOUT: %s' % bin(event))
# Write event happened,we use proxy to deal
# print 'Current file descripter: %d' % fileno
self.__proxy(
fileno) # We neet a proxy using a function,but not a threadiing! If threading it has bugs(multi trigger event--I think) elif event & select.EPOLLHUP:
self.logger.debug('EVENT EPOLLHUP: %s' % bin(event))
# Client hung up, del event!
self.logger.info("closing %s %s (HUP)" % self.addresses[fileno])
self.__delevnet(fileno) elif event & select.EPOLLERR:
# self.logger.debug('EVENT: %s' % event)
self.logger.info(" exception on %s" % connections[fileno].getpeername())
self.__delevnet(fileno) else:
# self.logger.debug('EVENT: %s' % bin(event))
# Other event,do not handle
pass if __name__ == '__main__':
# pdb.set_trace()
myserver = Server()
myserver.start_server()

server

# /usr/bin/env python
# coding:utf-8 import socket
import sys, os
import threading
import time
import logging
import multiprocessing
import random import datetime import hashlib
# hashlib.md5(open(fileName,'rb').read()).hexdigest() import pdb sys.path.append('../')
import exceptions # 导入任务调度模块
from apscheduler.schedulers.blocking import BlockingScheduler from sqldb import SQLdb
from mylog import MyLog as Log from communication_packet import Communication_Packet class Client(object):
# Handle message type
HANDLE_GENERAL = 1 # 处理普通的应答包
HANDLE_INSERT = 2 # 所有数据安装传递过来的进行数据库插入操作
HANDLE_UPDATE = 3 # 对传递过来的数据进行更新操作,需要传递更新条件
HANDLE_INERT_UPDATE = 4
HANDLE_FILE = 5 def __init__(self, IP='112.33.9.154', Port=11366, blackdir=None, db=None):
self.tasksched = BlockingScheduler() # 任务调度器
self.serverIP = IP # 设置服务器IP
self.serverPort = Port # 设置服务器端口
if db is not None:
self.db = db
else:
self.db = SQLdb() # 初始化数据库用于数据库同步 self.lock = threading.Lock() # 使用默认模式:debug模式
self.log = Log()
self.log.openConsole() # 打开控制端输出
self.logger = self.log.getLog() self.EOF = '\n\r\n' # Set EOF flag if blackdir is None:
self.basepath = './blacklist/'
else:
self.basepath = blackdir self.sysdbFirstFlag = False
self.key = None # using to calculate token
self.encryption = None
self.detect_piont_name = 'xxxx'
self.detect_piont_serial_number = 'xxxxxx' def set_DB(self, host=None, username=None, password=None, dbname=None):
self.db = SQLdb(host, username, password, dbname) def set_first_synclog_flag(self, flag):
self.synclog_flag = flag def setBlacklistDir(self, filedir=None):
# Set blacklist dir
if filedir is None:
self.basepath = './blacklist/'
else:
self.basepath = filedir def createNewBlackListPath(self): # blacklistdir if not exists,create it
if os.path.exists(self.basepath):
pass
else:
try:
os.mkdir(self.basepath)
except:
raise
nowtime = datetime.datetime.now().strftime('%Y_%b_%d_%H_%M_%S')
filename = 'blacklist_' + nowtime + '.txt' # 根据文件扩展名
filepath = self.basepath + filename
return filepath def handle_file_func(self, content, filename):
try:
content_data = eval(db_content)
except Exception, e:
self.logger.error('-----handle_file_func: eval business_content error!-----')
return False
else:
# Open file for write data
try:
w_file = file(filename, 'w')
for data_item in content_data:
w_file.write(str(data))
else:
w_file.close()
except Exception, e:
self.logger.error('-----handle_file_func: write data to file Error!------')
return False
else:
return True def reap(self):
# 回收可回收的进程,使用多进程的时候调用...可能不用
while True:
try:
result = os.waitpid(-1, os.WNOHANG)
if not result[0]: break
except:
break
self.logger.info("reaped child process %d" % result[0]) def __connect_server(self, IP, Port):
'''
连接远程服务器反正通讯套接字
'''
# Creat a TCP/IP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Connet the socket to the port where the server is listening
server_address = (IP, Port)
self.logger.info('connecting to %s port %s' % server_address)
try:
sock.connect(server_address)
except:
self.logger.error('connecting to %s port %s error!' % server_address)
raise
return sock def __get_tasks(self): # Get task from db,return an list def scheduler_tasks(self, task_list=None):
# start scheduler to sched the tasks
pass def calculateTime(self, starttime=None, intervalMinutes=None):
if not starttime:
nowtime = datetime.datetime.now()
else:
nowtime = starttime
if intervalMinutes:
interval = datetime.timedelta(minutes=intervalMinutes)
return nowtime - interval
else:
return nowtime def calc_md5(self, data):
return hashlib.md5(data)).hexdigest() def validating_message_token(self, receving_data):
pre_md5 = receving_data[:16]
suffix_md5 = receving_data[-16:]
message_md5 = pre_md5 + suffix_md5
message = receving_data.lstrip().rstrip(suffix_md5)
cur_md5 = self.calc_md5(message)
if message_md5 == cur_md5:
return True, message
else:
return False, message
pass def read_sync_state(self):
qeury_sql = "select * from sync_state;"
DB_ERROR = False
with self.lock:
sync_status = self.db.fechdb(query_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
raise
if sync_status:
sync_status_dict = dict()
for sync_s in sync_status:
sync_status_dict[sync_s[0]] = sync_s
return sync_status_dict
else:
return None def read_sync_contol_configure(self, read_flag=False):
# Read the control configuration from loacal db,if have not,we sync it from server,then read it again qeury_sql = "select * from sync_control;"
DB_ERROR = False # set table name
table_name = 'sync_control' # get `sync_control` table fields
table_field = self.get_table_filed(table_name, self.lock, self.db)
if not table_field:
self.logger.error('----------' % table_name)
return None with self.lock:
control_configure = self.db.fechdb(qeury_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
self.logger.error('-----Get control configure Error!-----')
return None
if control_configure:
# Get the configure from db! and On the basis of classification of table name and sync type(uploat or download)
# format the configure to a list
control_configure = []
for iter_conf in control_configure:
control_configure_item = dict.fromkeys(table_field)
lenth = len(table_field)
# set value for everyone key
for i in range(lenth):
control_configure_item[table_field[i]] = iter_conf[i]
control_configure.append(control_configure_item)
return control_configure
else:
# we need get the configuration from the server! reload the configuration!
if read_flag: # if we read it again and no configure,return
return None # sysnc the sync_control table from the server!
self.logger.info('=====Start to init the sync control table from Server...=====')
try:
socket = self.__connect_server(self.serverIP, self.serverPort) # 连接服务器
except Exception, e:
# raise Exception
raise e # we need carry the detect point number to server!
content = dict(sync_point=self.detect_piont_name, sync_point_no=self.detect_piont_serial_number) message_info = (Communication_Packet.DOLOAD_DB, str(content), True, table_name, table_field)
error_info = (False, None, None)
message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True) send_message = self.packaging_message(message_info, error_info, message_status)
try:
socket.sendall(send_message)
except, Exception, e:
# 发送出现错误则直接返回终止此次业务请求
self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----')
return
self.download_db(socket, self.HANDLE_INSERT)
self.logger.info('=====End init the sync control table from Server!=====') # After get control table we need read it from the local table again!
self.read_sync_contol_configure(True) def start_tasks(self):
control_config = self.read_sync_contol_configure()
thread_list = []
if control_config:
for config in control_config:
newthread = threading.Thread(target=self.thread_task, args=(config))
newthread.setDaemon = True
newthread.start()
thread_list.append(newthread)
for t in thread_list:
t.join()
else:
self.logger.error('-----init the sync control configuration error!-----')
raise def thread_task(self, task_config):
# init an instance of sheduler
my_scheduler = BlockingScheduler()
# self.tasksched.add_job(self.synchronous_DB, 'interval', minutes = 15,max_instances = 1)
'''
--cron--
Parameters:
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)
'''
if task_config['sync_type'] == 'upload':
if task_config['sync_time_mode'] == 'period':
period_minutes = task_config['sync_time_value']
my_scheduler.add_job(self.syn_upload_db, 'interval', minutes=15, max_instances=1, args=task_config)
my_scheduler.start()
elif task_config['sync_time_mode'] == 'fixed_time':
try:
hours, minutes, seconds = task_config['sync_time_value'].split(':')
except:
self.logger.error('-----get sync_time_value Error!-----')
raise e
else:
my_scheduler.add_job(self.syn_upload_db, 'cron', year='*', month='*', day='*', hour=hours,
minute=minutes, second=seconds, max_instances=1, args=task_config)
my_scheduler.start()
else:
self.logger.error('----sysnc control config error-----')
raise
elif task_config['sync_type'] == 'download':
if task_config['sync_time_mode'] == 'period':
period_minutes = task_config['sync_time_value']
my_scheduler.add_job(self.sync_download_db, 'interval', minutes=15, max_instances=1, args=task_config)
my_scheduler.start()
elif task_config['sync_time_mode'] == 'fixed_time':
try:
hours, minutes, seconds = task_config['sync_time_value'].split(':')
except:
self.logger.error('-----get sync_time_value Error!-----')
raise e
else:
my_scheduler.add_job(self.sync_download_db, 'cron', year='*', month='*', day='*', hour=hours,
minute=minutes, second=seconds, max_instances=1, args=task_config)
my_scheduler.start()
else:
self.logger.error('----sysnc control config error-----')
raise
else:
self.logger.error('----sync_type error-----')
raise def sync_download_db(self, config):
# parsing the config to get the sql
pass def syn_upload_db(self, config):
pass def validating_content_token(self, content):
receive_content = content['Business']['Content']
receive_md5 = content['Info_Status']['Security_Token']
receive_time = content['Info_Status']['Time']
cur_md5 = self.calc_md5(receive_content + receive_time + self.key)
if cur_md5 == receive_md5:
return True
else:
return False
pass # encrypt decrypt
def packaging_message(self, Message=None, Error=None, Info_Status=None):
# ----Pack send message
# Init Communication_Packet
cm_packet = Communication_Packet() # def set_content_Business(self,b_type,b_content,table_name = None,table_field = None,b_is_syncdb = True):
cm_packet.set_content_Business(targv=Message) # def set_content_Error(self,error_flag = False,error_type = None,error_info = None):
cm_packet.set_content_Error(Error) now_time = str(datetime.datetime.now()) # get current time
# Business+time+key calculate token,calculate token
security_token = self.calc_md5(str(cm_packet.CMC_Business) + now_time + self.key)
# def set_content_Info_Status(self,info_type,security_token,time,is_end):
# Need to replace the security_token
Info_Status = list(Info_Status) # 转化元组到列表
Info_Status[1] = security_token
Info_Status = tuple(Info_Status) # 重新转化列表到元组作为参数 cm_packet.set_content_Info_Status(targv=Info_Status)
try:
send_data = cm_packet.content
except Exception, e:
raise e
else:
# we Encryption data
encryption_send_data = self.encryption.encrypt(send_data)
# caculate md5
encrypt_send_md5 = self.calc_md5(encryption_send_data)
complete_send_data = encrypt_send_md5[:16] + encryption_send_data + encrypt_send_md5[-16:] + self.EOF
return complete_send_data def unpackaging_message(self, unpacking_str):
if not instances(unpacking_str, str):
raise exceptions.ValueError
else:
unpacking_str = unpacking_str.strip(self.EOF)
flag, message = self.validating_message_token(unpacking_str)
if flag:
decrypt_str = self.encryption.decrypt(message)
try:
message_dict = eval(decrypt_str)
except, Exception, e:
self.logger.error('Eval decrypt_str Error!')
raise e
else:
if validating_content_token(message_dict):
return message_dict
else:
self.logger.error('Message is tampered!')
return None
pass
else:
self.logger.error('Message is tampered!')
return None
pass def recieve_packet(self, communication_socket):
# 接收数据从服务器
recieveData = None
partdata = None
try:
while partdata is not None:
try:
partdata = communication_socket.recv(4096)
except Exception, e:
self.logger.error(str(e))
break
else:
recieveData += partdata
if partdata.endswith(self.EOF):
break
recieveData = recieveData.rstrip(self.EOF)
return recieveData
except Exception, e:
raise e
pass def insertdb_updatedb(self, handle_type, db_content, table_name, table_field, filter_condition=None):
'''
add
'''
try:
db_data = eval(db_content)
except Exception, e:
self.logger.error('-----eval business_content error!-----')
raise e
else:
try:
first_field = eval(table_field)[0]
except Exception, e:
self.logger.error('-----eval table_field error!-----')
raise e
if first_field == 'id':
is_id = True
else:
is_id = False
for db_item in db_data:
if is_id:
rowdata = db_item[1:]
else:
rowdata = db_item
self.logger.debug(rowdata)
# print dict(zip(self.phishing_log_fields,rowdata))
if handle_type == self.HANDLE_INSERT or handle_type == self.HANDLE_INERT_UPDATE:
insert_sql = 'insert into %s %s values(%s);' % (
table_name, table_field.replace('\'', ''), self.format_tuple(rowdata)) # print insert_sql
DB_ERROR = False
with self.lock:
self.db.insertdb(insert_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
if handle_type == self.HANDLE_INERT_UPDATE: pass
elif handle_type == self.HANDLE_INSERT:
pass
else:
pass
elif handle_type == self.HANDLE_UPDATE:
# update the data
pass
else:
pass else:
return True
pass def set_filter(self, filter, table_field, filter_key, filter_relationship):
'''
Func: set the filter for update the db when when handling the message if the handle_flag is set HANDLE_INERT_UPDATE!
Note: all the filter condithion are needed to be ordered!
table_field: filter condithion field
filter_key: = >= <= != if has like,like|>=... And so on
filter_relationship: and、or、None
'''
if not filter:
filter = dict(filter=[])
else:
if 'filter' not in filter.keys():
filter['filter'] = []
filter['filter'].append(table_field, filter_key, filter_relationship) return filter def set_update_field(self, filter, update_field):
'''
Set the update fileds when update db!
update_field:update filed list!
'''
filter['update_field'] = update_field def parsing_filter(self, filter, table_name, table_fields, table_values, filter_field_values=None):
'''
Func: return the update sql
'''
update_values = []
update_fields = filter['update_field']
for field in update_fields:
v_index = table_fields.index[field]
up_value = table_values[v_index]
update_values.append(up_value) update_sql_part_one = 'update %s set ' % table_name
lenth = len(update_values)
for i in range(lenth)
up_value = update_values[i]
up_field = update_fields[i]
update_sql_part_one += '%s=\'%s\',' % (up_field, up_value) # strip the last comma result like: update tablea set a = '1',b = '2'
update_sql_part_one = update_sql_partone.rstrip(',') update_sql_part_two = 'where '
for tmp_item in filter['filter']:
field = tmp_item[0]
filter_key = tmp_item[1].splite['|']
relationship = tmp_item[2] # get filter condithion value. For example: where left(ip_loc,2) = 'China'
if field in filter_field_values:
filter_condition_value = filter_field_values[field]
else:
v_index = table_fields.index[field]
filter_condition_value = table_values[v_index]
if not relationship:
relationship = ''
if '(' in filter_key:
update_sql_part_two += "%s%s %s " % (filter_key, filter_condition_value, relationship)
else:
update_sql_part_two += "%s%s%s %s" % (field, filter_key, filter_condition_value, relationship) # merge the sql statement
update_sql = update_sql_part_one + update_sql_part_two.strip()
return update_sql def handle_message__packet(self, socket, handle_type=None, filter=None, handle_callback=None, is_recieved=False,
message_dict=None):
'''
socket: communication socket
handle_type: handle message type(HANDLE_GENERAL,HANDLE_INSERT,HANDLE_UPDATE)
filter: if handle_type is HANDLE_UPDATE,it is the update filter condithion
handle_callback: default is None!if not None,it will use the handle_callback to handle the message
is_recieved: default is False,if message is received.
message_dict: if is_recieved is True,it is the received message_dict
Note: if use some args,you need to use 'key = vale' to passing parameters
'''
if not is_recieved:
# if not recieved message,we receive the message and unpack the message
try:
message = self.recieve_packet(socket)
except Exception, e:
self.logger.error(str(e))
else:
message_dict = self.unpackaging_message(message) if message_dict:
if handle_callback:
# if callback is not None,use the handle_callback to hanle the message
handle_callback(message_dict)
else:
# Pasing packet and get information
# get bussiness type
business = response_dict['Business']
business_type = business['Type']
business_content = business['Content']
business_is_syncdb = business['Is_Syncdb']
business_table_name = business['Table_Name']
business_table_field = business['Table_Field'] # get error info
Error_info = response_dict['Error']
error_flag = Error_info['Is_Error']
error_type = Error_info['Error_Type']
error_info = Error_info['Error_Info'] # get packet info status
info_status = response_dict['Info_Status']
info_type = info_status['Info_Type']
is_end = info_status['Is_End']
token = info_status['Security_Token']
info_time = info_status['Time'] if handle_type == self.HANDLE_GENERAL:
# This packet is GENERAL communication! hanling upload_db response message to make sure that server handle the business
if communication_type == communication_packet.GENERAL_INFO and business_content == communication_packet.HANDLE_OK and info_type == communication_packet.RESPONSE
return True, is_end
elif error_flag:
self.logger.error(
'Message\'type is \'%s\' and message\'error_info is \'%s\'' % (error_type, error_info))
return False, is_end
else:
self.logger.error('This message packet is not the general message!')
return False, is_end
pass
elif handle_type == self.HANDLE_INSERT and info_type == communication_packet.RESPONSE:
handle_flag = self.insertdb_updatedb(self.HANDLE_INSERT, business_content, business_table_name,
business_table_field)
return handle_flag, is_end
pass
elif handle_type == self.HANDLE_UPDATE and info_type == communication_packet.RESPONSE:
return False, is_end
pass
elif handle_type == self.HANDLE_INERT_UPDATE and info_type == communication_packet.RESPONSE:
handle_flag = self.insertdb_updatedb(self.HANDLE_INERT_UPDATE, business_content, business_table_name,
business_table_field, filter)
return handle_flag, is_end
elif handle_type == self.HANDLE_UPDATE and info_type == communication_packet.RESPONSE:
handle_flag = self.insertdb_updatedb(self.HANDLE_UPDATE, business_content, business_table_name,
business_table_field)
return handle_flag, is_end
elif handle_type == self.HANDLE_FILE and info_type == communication_packet.RESPONSE:
handle_flag = self.handle_file_func(business_content)
return handle_flag, is_end
pass
else:
# 没有处理请求包,因为服务器不会主动发起请求
return False, is_end
pass else:
raise Exception('handle_response_packet\'message_dict is None!') pass def upload_db(self, communication_socket, table_name, query_sql=None, ALL=True, table_field=None, Error_info=None):
# Get the dbinfo which is sended
# if db_data is Relatively large,it will raise exceptions,so you need contol the data
DB_ERROR = False
DB_CNT_ERROR = False
if query_sql:
with self.lock:
db_data = self.db.fechdb(query_sql)
DB_ERROR = self.db.Error # If query db error return None
if DB_ERROR:
self.logger.error('Query DB Error! Crrent query_sql: %s' % query_sql)
return None if not db_data:
# 如果没数据则不需要向服务器传输数据,直接返回None
self.logger.info('No Data need to be upload! table_name: %s' % table_name)
return None # Set packet args
# def set_content_Business(self,b_type = Communication_Packet.GENERAL_INFO,b_content = Communication_Packet.HANDLE_OK,table_name = None,table_field = None,b_is_syncdb = False,targv = None):
message = (Communication_Packet.UPLOAD_DB, str(db_data), table_name, table_field, True)
# def set_content_Error(self,error_flag = False,error_type = None,error_info = None,targv = None):
error_info = (False, None, None)
# def set_content_Info_Status(self,info_type,security_token,time,is_end,targv = None):
info_status = (Communication_Packet.REQEST, None, ALL)
complete_send_data = self.packaging_message(message, error_info, info_status)
try:
communication_socket.sendall(complete_send_data)
except, Exception, e:
self.logger.error('Send data error when upload_db!')
raise e
else:
if ALL:
return True
else:
# 对应答包进行处理进行处理
try:
handle_flag, business_is_end = self.handle_general_response_packet(communication_socket)
except Exception, e:
raise e
else:
if handle_flag and business_is_end:
return True
pass
else:
return False
pass
else:
raise Exception('query_sql statement is None') def download_db(self, communication_socket, handle_type=None, handle_message_callback=None, filter=None):
# We receive data from server
try:
recieveData = self.recieve_packet(communication_socket)
except Exception, e:
self.logger.error('Download_db Error when receiving data!')
raise e
# unpacking message
Message_dict = self.unpackaging_message(recieveData) if not message:
raise Exception('download_db Message_dict Error!')
if handle_message_callback:
handle_flag = handle_message_callback(Message_dict)
else:
is_handle_complete, is_handle_again = self.handle_message__packet(communication_socket, is_recieved=True,
message_dict=Message_dict, filter=filter) # parsing received data to dict
# 这里需要用递归处理数据
if is_handle_complete and not is_handle_again:
self.download_db(communication_socket, handle_message_callback, filter)
else:
# 关闭socket连接,退出函数
communication_socket.close()
pass def format_tuple(self, tup):
'''
It is None if field in DB is NULL when we get the data from db use mysqldb!
Format the None to NuLL for inserting data to DB
'''
valuelist = ['NULL' if t is None else t for t in tup]
padlist = ['%s' if t is None else '\'%s\'' for t in tup]
padstr = ''
for pl in padlist:
padstr += pl
padstr += ','
else:
padstr = padstr[:-1]
return padstr % tuple(valuelist) def sync_log_db(self, Start_Time=None, End_Time=None, Sync_All=False, ID=None):
# pdb.set_trace()
'''
sync log db to Server!
If error when sync log,it will return but not raise an exception!
If raise an exception,it will impact tha main program
'''
self.logger.info('=====Start to sysnc log=====')
try:
# connect to server,get and communication socket
sock = self.__connect_server(self.serverIP, self.serverPort)
except:
raise
last_update_id = None # updating update_id
# query log db to get some info
if Start_Time and End_Time:
if datetime.datetime.strptime(Start_Time, '%Y-%m-%d %H:%M:%S') > datetime.datetime.strptime(End_Time,
'%Y-%m-%d %H:%M:%S'):
self.logger.error('-----sync_log_db argv\'Start_Time and End_Time is error!,End this task-----')
# raise Exception('Start_Time and End_Time Error!')
return
query_cnt_sql = "select count(id),min(id),max(id) from phishing_log where time between '%s' and '%s';" % (
Start_Time, End_Time)
# query_log_sql = "select * from phishing_log where time between '%s' and '%s';" %(Start_Time,End_Time)
elif Sync_All:
query_cnt_sql = "select count(1),min(id),max(id) from phishing_log;"
# query_log_sql = 'select * from phishing_log;'
elif ID:
query_cnt_sql = "select count(1),min(id),max(id) from phishing_log where id > %s;" % ID
# query_log_sql = "select * from phishing_log where id > %s;" % ID
else:
query_update_id_sql = "select last_update_id,last_update_time from ph_log_sync_id;"
DB_ERROR = False
with self.lock:
ID_Tuple = self.db.fechdb(query_update_id_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
DB_ERROR = False
self.logger.error('-----Get id from ph_log_sync_id Error. End this task----')
last_update_id = ID_Tuple[0][0]
query_cnt_sql = "select count(1),min(id),max(id) from phishing_log where id > %s;" % last_update_id
# query_log_sql = "select * from phishing_log where id > %s;" % last_update_id # get table fields!
log_fields = self.get_table_filed(table_name, self.lock, self.db)
if not table_field:
self.logger.error(
'-----Terminate this task(sync_log_db),becase of getting the %s table fileds fialed!-----' % table_name)
return with self.lock:
data_cnt = self.db.fechdb(query_cnt_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
# Record Error into log and end task
DB_ERROR = False
self.logger.error('-----Get log data count Error when sys log db! End this task-----')
return
else:
# 若果数据超过10000条则分批进行同步处理
cnt, min_id, max_id = data_cnt[0]
upload_cnt = 1
if cnt >= 10000:
upload_cnt = (max_id - min_id + 1) / 1000 + 1
# def upload_db(self,communication_socket,table_name,query_sql = None,ALL = True,table_field = None,Error_info = None):
ALL = False
log_table_name = 'phishing_log'
for i in range(upload_cnt):
start_id = min_id
end_id = min_id + 10000
if end_id >= max_id:
end_id = max_id
ALL = True
query_log_sql = "select * from phishing_log where id >%s and id <=end_id" % (start_id, end_id)
try:
handle_ok = self.upload_db(sock, log_table_name, query_log_sql, ALL, log_fields, None)
except Exception, e:
self.logger.error('-----' + str(e) + 'when sysnc db!-----')
return
if handle_ok:
continue
else:
self.logger.error("-----upload_db Error! And query_log_sql is '%s'.End this task-----" % query_log_sql)
break
else:
# update update_id
if last_update_id:
cur_time = datetime.datetime.now()
update_id_sql = "update ph_log_sync_id set last_update_id = '%s',last_update_time = '%s' where last_update_id = %s;" % (
max_id, cur_time, last_update_id)
BD_ERROR = False
with self.lock:
self.db.updatedb(update_id_sql)
BD_ERROR = self.db.Error
if BD_ERROR:
inset_id_sql = "insert into ph_log_sync_id valuse(%s,%s)" % (max_id, cur_time)
with self.lock:
self.db.insertdb(inset_id_sql)
BD_ERROR = self.db.Error
if BD_ERROR:
self.logger.error(
'-----This sys log db error when update last_update_id! Old id is %s,new id is %s! End this task' % (
last_update_id, max_id) - ----)
else:
pass
else:
pass self.logger.info('=====End to sysnc log=====') def gain_newconfig_from_server(self):
'''
Get newest configure which is the fishing website regular expressions from server!
''' self.logger.info('=====Start to gain newcofig from Server...=====')
try:
socket = self.__connect_server(self.serverIP, self.serverPort) # 连接服务器
except Exception, e:
# raise Exception
raise e # 查询数据库从数据库获取当前配置的版本信息
query_version_sql = "select max(version),update_time from detection_version;"
BD_ERROR = False
with self.lock:
version_tuple = self.db.fechdb(query_version_sql)
DB_ERROR = self.db.Error
if BD_ERROR:
self.logger.error('-----Get local configure version Error! End this task-----')
return
local_version = version_tuple[0][0]
local_update_time = version_tuple[0][1]
table_name = 'detection_version' # Get detection_version fields
# If get fields fialed,it will terminate this task!
table_field = self.get_table_filed(table_name, self.lock, self.db)
if not table_field:
self.logger.error(
'-----Terminate this task(gain_newconfig_from_server),becase of getting the %s table fileds fialed!-----' % table_name)
return message_info = (Communication_Packet.DOLOAD_DB, str(local_version), True, table_name, table_field)
error_info = (False, None, None)
message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True) send_message = self.packaging_message(message_info, error_info, message_status)
try:
socket.sendall(send_message)
except, Exception, e:
# 发送出现错误则直接返回终止此次业务请求
self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----')
return
self.download_db(socket, self.HANDLE_INSERT)
self.logger.info('=====End to gain newcofig from Server!=====') def get_detection_point_num(self):
# Get detection_point_num from file or db and return it!
# Reading the config file to get he detection_point_num!
# wait to handle...
detection_point_num = None
is_detection_point = None return tdetection_point_num, is_detection_point def get_table_filed(self, table_name, db, lock):
# from the db get the table field!
query_detection_version_field_sql = "select COLUMN_NAME from information_schema.COLUMNS where table_name = '%s';" % table_name
with self.lock:
detection_version_fields = self.db.fechdb(query_detection_version_field_sql)
DB_ERROR = self.db.Error
if DB_ERROR:
# Record Error and end task
DB_ERROR = False
self.logger.error('----Get %s fileds Error! End this task-----' % table_name)
return
else:
# query result is Unicode,so we need to encode to utf-8
table_field = [field.encode('utf-8') for field in detection_version_fields[0]]
return table_field def sync_info_db(self):
'''
1、Get blacklist from server and write it to file for linkage equipment
2、Sync info table to client point
Use a way that depends on the config file
'''
self.logger.info('=====Start to get blacklist from Server=====')
# Get communication socket
try:
sock = self.__connect_server(self.serverIP, self.serverPort) # connect to server
except Exception, e:
self.logger.error(str(e))
raise
is_detection_point, point_num = self.get_detection_point_num() # Need to handle again-----
table_name = 'phishing_info'
if is_detection_point:
table_field = ('phishing_site')
else:
# get the table field from the db
table_field = self.get_table_filed(table_name, self.lock, self.db)
if not table_field:
self.logger.error(
'-----Terminate this task(gain_newconfig_from_server),becase of getting the %s table fileds fialed!-----' % table_name)
return # packing the request packet
message_info = (Communication_Packet.DOLOAD_DB, str(point_num), True, table_name, table_field)
error_info = (False, None, None)
message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True) send_message = self.packaging_message(message_info, error_info, message_status)
try:
socket.sendall(send_message)
except, Exception, e:
# if send data error,end this task!
self.logger.error('-----When send the gain_newconfig_from_server message request to server Error!-----')
return if is_detection_point:
# If detection piont we update the phishing_info
self.download_db(socket, self.HANDLE_INSERT)
else:
# write the data to file!
self.download_db(socket, self.HANDLE_FILE)
self.logger.info('=====End to get phishing_info from Server!=====') def sync_white_list(self, sync_flag=0):
'''
sync_flag:
0、first upload the white list then download the white list from server
1、only upload the white list
2、only download the white list
'''
# Get communication socket
try:
sock = self.__connect_server(self.serverIP, self.serverPort) # connect to server
except Exception, e:
self.logger.error(str(e))
raise
if sync_flag == 0 or sync_flag == 1:
self.sync_client_white_list_to_server(sock)
if sync_flag == 0 or sync_flag == 2:
self.sync_server_white_list_to_client(sock) # close the network socket,end this task!
sock.close() def sync_server_white_list_to_client(self, communication_socket):
# pdb.set_trace()
'''
Sync server white list to this detection point
communication_socket: Network socket
'''
self.logger.info('=====Start sync_server_white_list_to_client!=====') # get the last update white list time of this client!
qurey_sql = "select max(update_time) from white_list where valid = true;"
BD_ERROR = False
with self.lock:
loacal_last_update_time = self.db.fechdb(qurey_sql)
BD_ERROR = self.db.Error
if BD_ERROR:
self.logger.error('-----Get white list last update time Error! End this task!-----')
return
if loacal_last_update_time:
last_update_time = loacal_last_update_time[0][0]
else:
last_update_time = None table_name = 'white_list'
table_field = self.get_table_filed(table_name, self.lock, self.db)
if not table_field:
self.logger.error(
'-----Terminate this task(sync_server_white_list_to_client),becase of getting the %s table fileds fialed!-----' % table_name)
return # packing the request packet
message_info = (Communication_Packet.DOLOAD_DB, str(last_update_time), True, table_name, table_field)
error_info = (False, None, None)
message_status = (Communication_Packet.REQEST, None, str(datetime.datetime.now()), True)
send_message = self.packaging_message(message_info, error_info, message_status)
try:
socket.sendall(send_message)
except, Exception, e:
# if send data error,end this task!
self.logger.error('-----When send the sync_server_white_list_to_client message request to server Error!-----')
return # After send the request,start to download white list db data
# update_sql = "update white_list set domain = '%s',valid = '%s',recorded = '%s',update_time = '%s',website_name = '%s',website_link = '%s',icp_code = '%s',organization = '%s'where domain = '%s';" % fielddata
myfilter = dict()
update_field = (
'domain', 'valid', 'recorded', 'update_time', 'website_name', 'website_link', 'icp_code', 'organization')
self.set_update_field(myfilter, update_field) # def set_filter(self,filter,table_field,filter_key,filter_relationship):
self.set_filter('domain', '=', None)
self.logger.debug(str(myfilter))
self.download_db(communication_socket, self.HANDLE_INSERT, filter=myfilter) def sync_client_white_list_to_server(self, communication_socket):
# pdb.set_trace()
'''
upload the local white list to server!
communication_socket: network socket
'''
self.logger.debug('=====Start sync_client_white_list_to_server!=====')
# qeury the the lacal white list data which is needed to upload
query_sql = "select * from white_list where update_time is NULL and valid = 1;"
query_modified_sql = "select * from white_list where update_time is not NULL and valid = 0;" # get white_list table fields
table_name = 'white_list'
table_field = self.get_table_filed(table_name, self.lock, self.db)
if not table_field:
self.logger.error(
'-----Terminate this task(sync_server_white_list_to_client),becase of getting the %s table fileds fialed!-----' % table_name)
return # upload the new local added white list
handle_ok = self.upload_db(sock, log_table_name, query_sql, False, log_fields, None)
if handle_ok:
# upload the loacl new modified white list
handle_ok = self.upload_db(sock, log_table_name, query_modified_sql, True, log_fields, None)
if handle_ok:
self.logger.info('upload the loacl new modified white list OK!')
self.logger.info('sync_client_white_list_to_server OK!')
else:
self.logger.error('-----UPLOAD the loacl new modified white list Error!-----')
else:
self.logger.error('' - ----UPLOAD
the
new
local
added
white
list
Error!-----'') def task_scheduled(self):
# muti process scheduling the task
pass
) def start_client(self):
'''
--interval--
Parameters:
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations
--cron--
Parameters:
year (int|str) – 4-digit year
month (int|str) – month (1-12)
day (int|str) – day of the (1-31)
week (int|str) – ISO week (1-53)
day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – hour (0-23)
minute (int|str) – minute (0-59)
second (int|str) – second (0-59)
start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)
end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) add_job:
Parameters:
func – callable (or a textual reference to one) to run at the given time
trigger (str|apscheduler.triggers.base.BaseTrigger) – trigger that determines when func is called
args (list|tuple) – list of positional arguments to call func with
kwargs (dict) – dict of keyword arguments to call func with
id (str|unicode) – explicit identifier for the job (for modifying it later)
name (str|unicode) – textual description of the job
misfire_grace_time (int) – seconds after the designated run time that the job is still allowed to be run
coalesce (bool) – run once instead of many times if the scheduler determines that the job should be run more than once in succession
max_instances (int) – maximum number of concurrently running instances allowed for this job
next_run_time (datetime) – when to first run the job, regardless of the trigger (pass None to add the job as paused)
jobstore (str|unicode) – alias of the job store to store the job in
executor (str|unicode) – alias of the executor to run the job with
replace_existing (bool) – True to replace an existing job with the same id (but retain the number of runs from the existing one)
''' # 根据情况调度任务
# new_task_sched = BlockingScheduler()
# new_task_sched.add_job(self.synchronous_DB, 'interval', minutes = 5,max_instances = 1)
# new_task_sched.start()
# print '===================='
'''
self.tasksched.add_job(self.synchronous_DB, 'interval', minutes = 5,max_instances = 1)
self.tasksched.add_job(self.get_newconfig, 'interval', days = 1,max_instances = 1)
self.tasksched.add_job(self.get__new_blacklist, 'interval', days = 1,max_instances = 1)
self.tasksched.add_job(self.upload_white_list, 'interval', days = 1,max_instances = 10)
self.tasksched.add_job(self.updatewhitelist, 'interval', days = 1,max_instances = 10)
'''
self.tasksched.add_job(self.synchronous_DB, 'interval', minutes=15, max_instances=1)
self.tasksched.add_job(self.get_newconfig, 'interval', minutes=30, max_instances=1)
self.tasksched.add_job(self.get__new_blacklist, 'interval', minutes=30, max_instances=1)
self.tasksched.add_job(self.upload_white_list, 'interval', minutes=30, max_instances=1)
self.tasksched.add_job(self.updatewhitelist, 'interval', minutes=30, max_instances=1)
self.tasksched.start() def connecttest(self):
self.logger.info('Start connect server...')
sock = self.__connect_server(self.serverIP, self.serverPort) # 连接服务器
self.logger.info('connect server ok!')
time.sleep(10) self.logger.info('close sock')
sock.shutdown(socket.SHUT_RDWR)
sock.close() def test(self):
'''
debug
'''
# pdb.set_trace()
# self.setsysdbflag(True)
# self.updatewhitelist()
# self.sysdbFirstFlag = True
# self.synchronous_DB()
# self.get__new_blacklist()
# self.get_newconfig()
# self.connecttest()
self.upload_white_list()
# self.updatewhitelist() def muilti_test(self):
jobs = []
get_black_list_thread = threading.Thread(target=self.get__new_blacklist)
get_black_list_thread.daemon = True get_config_thread = threading.Thread(target=self.get_newconfig)
get_config_thread.daemon = True upload_white_list_thread = threading.Thread(target=self.upload_white_list)
upload_white_list_thread.daemon = True update_white_list_thread = threading.Thread(target=self.updatewhitelist)
update_white_list_thread.daemon = True get_black_list_thread.start()
jobs.append(get_black_list_thread) get_config_thread.start()
jobs.append(get_config_thread) upload_white_list_thread.start()
jobs.append(upload_white_list_thread) update_white_list_thread.start()
jobs.append(update_white_list_thread) for t in jobs:
t.join() if __name__ == '__main__':
'''
起进程出问题,所以只能起线程,还没有找到具体原因
'''
# mydb = SQLdb(dbname = 'tmp')
# myclient = Client(db = mydb)
myclient = Client()
if len(sys.argv) >= 2:
try:
flag = sys.argv[1]
except:
pass
else:
try:
flag = flag.title()
flag = eval(flag)
except:
raise 'ARGV ERROR!'
else:
if isinstance(flag, bool):
myclient.setsysdbflag(flag)
else:
raise 'ARGV ERROR!'
# myclient.test()
# myclient.muilti_test()
myclient.start_client() '''
jobs=[]
for i in range(50):
tmpclient = Client()
newprocess = threading.Thread(target = Client.test,args=(tmpclient,))
#newprocess = multiprocessing.Process(target = Client.test,args=(tmpclient,)) #使用进程就会出错,查了一些资料好像是说是bug
newprocess.start()
jobs.append(newprocess) for t in jobs:
t.join()
'''

client