python 3.5构建WINDOWS推送服务

时间:2023-12-20 17:37:32
import ConfigParser
import os
import sys cf = ConfigParser.ConfigParser()
#绝对路径获取
ABSPATH=os.path.abspath(sys.argv[0])
ABSPATH=os.path.dirname(ABSPATH)+"/" cf.read(ABSPATH +'digest.conf')

利用PYTHON3.5构建WINDOWS服务

功能介绍:

利用PYTHON服务,向用户进行数据推送服务

实现的功能:

1、客户端验证

2、配置文件加解密

3、MSSQL数据库的读取

4、日志记录

服务端:

#!/usr/bin/env python
# -*- coding: UTF8 -*-
import win32serviceutil
import win32service
import win32event
import win32timezone
import sys
import servicemanager
import os,sys,socket
rootdir =os.path.abspath(sys.argv[0])
rootdir =os.path.dirname(rootdir) +"/"
import select,pymssql,logging,configparser
from logging.handlers import TimedRotatingFileHandler
# 以下_mssql decimal 模块用于打包PY文件时
import _mssql
import decimal
import time,json
import threading
import hashlib
cf =configparser.ConfigParser()
cf.read(rootdir +'socket_server.conf') formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%Y/%m/%d %H:%M:%S')
logger = logging.getLogger("log") # name
logger.setLevel(logging.DEBUG)
try:
# 按1天进行日志切割,同时保存20天的日志
fh =TimedRotatingFileHandler(rootdir +"log/"+ "sock_server.log","d",1,20)
except:
os.makedirs(rootdir +"log/")
fh =TimedRotatingFileHandler(rootdir +"log/"+ "sock_server.log","d",1,20)
fh.setLevel(logging.DEBUG)
fh.setFormatter(formatter)
# 给log 添加Handler
logger.addHandler(fh) # 加密函数
def encrypt(s,key=20):
"""
加密方法
:param key:
:param s:
:return:
"""
b = bytearray(str(s).encode("gbk"))
n = len(b) # 求出 b 的字节数
c = bytearray(n*2)
j = 0
for i in range(0, n):
b1 = b[i]
b2 = b1 ^ key # b1 = b2^ key
c1 = b2 % 16
c2 = b2 // 16 # b2 = c2*16 + c1
c1 = c1 + 65
c2 = c2 + 65 # c1,c2都是0~15之间的数,加上65就变成了A-P 的字符的编码
c[j] = c1
c[j+1] = c2
j = j+2
return c.decode("gbk")
# 解密函数
def decrypt(s,key=20):
"""
解密方法
:param key:
:param s:
:return:
"""
c = bytearray(str(s).encode("gbk"))
n = len(c) # 计算 b 的字节数
if n % 2 != 0 :
return ""
n = n // 2
b = bytearray(n)
j = 0
for i in range(0, n):
c1 = c[j]
c2 = c[j+1]
j = j+2
c1 = c1 - 65
c2 = c2 - 65
b2 = c2*16 + c1
b1 = b2^ key
b[i]= b1
try:
return b.decode("gbk")
except:
return "failed"
# 初始华MSSQL类方法
class MSSQL:
"""
对pymssql的简单封装
pymssql库,该库到这里下载:http://www.lfd.uci.edu/~gohlke/pythonlibs/#pymssql
使用该库时,需要在Sql Server Configuration Manager里面将TCP/IP协议开启
用法:
"""
def __init__(self,host,user,pwd,db,port =1433):
self.host = host
self.port = port
self.user = user
self.pwd = pwd
self.db = db
def __GetConnect(self):
"""
得到连接信息
返回: conn.cursor()
"""
try:
if not self.db:
logger.info("没有设置数据库信息")
self.conn = pymssql.connect(host=self.host,port=self.port,user=self.user,password=self.pwd,database=self.db,charset="utf8")
cur = self.conn.cursor()
if not cur:
logger.info("连接数据库失败!")
else:
return cur
except Exception as E:
logger.info("连接数据库异常,请检查配置!错误信息:%s"%E)
def ExecQuery(self,sql):
"""
执行查询语句
返回的是一个包含tuple的list,list的元素是记录行,tuple的元素是每行记录的字段
"""
cur = self.__GetConnect()
cur.execute(sql)
resList = cur.fetchall()
# 查询完毕后必须关闭连接
self.conn.close()
return resList def ExecNonQuery(self,sql):
"""
执行非查询语句 调用示例:
cur = self.__GetConnect()
cur.execute(sql)
self.conn.commit()
self.conn.close()
"""
cur = self.__GetConnect()
cur.execute(sql)
self.conn.commit()
self.conn.close()
# SQL语句执行模块
def mssql(conn):
""" 对指定数据进行查询,并返回结果!"""
ms = MSSQL(host=cf.get("DB","ip"),user=decrypt(cf.get("DB","username")),pwd=decrypt(cf.get("DB","password")),db=cf.get("DB","db"),port=int(cf.get("DB","port")))
x = 0
while True:
try:
sql ="select top 10 ChatContentID ,a.Siteid,ChatContent,ChatMemberName,ChatMemberLevelID,ChatMemberLevelTitle,AccMemberName,AccMemberLevelID,AccMemberLevelTitle,a.States,ChatType,IsPush,IsRobot,a.CreateDate,b.MemberID from dbo.ChatContent a left join Member b on a.ChatMemberPhone=b.Phone where ChatContentID !=0 and ChatType in (0,1,2,6) and ChatContentID>%s order by CreateDate desc"%x
res = ms.ExecQuery(sql)
if res != "":
x = res[0][0]
json_res = ""
result = {}
for i in res:
result["ChatContentID"]=i[0]
result["Siteid"] = i[1]
result["ChatContent"]=i[2]
result["ChatMemberName"]=i[3]
result["ChatMemberLevelID"] =i[4]
result["ChatMemberLevelTitle"] =i[5]
result["AccMemberName"] = i[6]
result["AccMemberLevelID"] =i[7]
result["AccMemberLevelTitle"] =i[8]
result["States"]=i[9]
result["ChatType"]=i[10]
result["IsPush"]=i[11]
result["IsRobot"] =i[12]
result["CreateDate"] =str(i[13])
result["MemberID"] = i[14]
json_res += json.dumps(result)+"\r\n"
conn.send(bytes(json_res,encoding="utf8"))
except Exception as e:
# 没有数据等待10秒
time.sleep(int(cf.get("TIME","conn_time")))
continue
# SOCKET主进程模块
def process(conn,addr): try:
i = 0
# 认证失败允许重试3次
while i < 3:
flage = False
# 接收客户端连接请求信息
info = conn.recv(1000)
# 实例化加密函数
hash = hashlib.sha512()
hash.update(bytes("",encoding="utf8")) # KEY=123
hash_pwd = hash.hexdigest()
if info.decode() == hash_pwd:
logger.info("客户端ip:[%s]认证成功!"%addr[0])
flage = True
# 接收用户及密码信息
while flage:
mssql(conn)
else:
# 登陆失败,发送给客户端重新验证
i += 1
logger.info("客户端ip:[%s]认证失败!"%addr[0])
conn.send(bytes("验证失败!","utf8"))
if i > 2:
# 主动关闭连接
logger.warning("客户端ip:[%s]超过认证限制,服务端强制中断连接!"%addr[0])
conn.close()
except Exception as e:
logger.debug(e)
conn.close()
# SOCKET服务模块
def sock_server():
'''
启动服务器端,开启线程监听
:return:
''' server = socket.socket()
server_ip=cf.get("HOST","ip")
server_port = int(cf.get("HOST","port"))
# server_ip ="localhost"
# server_port = 4561
server.bind((server_ip,server_port))
server.listen(10)
while True:
r,w,e = select.select([server,], [], [], 1)
for i,server in enumerate(r):
conn,addr = server.accept()
# 创建线程
t = threading.Thread(target=process, args=(conn, addr))
# 启动线程
t.start()
# 建立WINDOWS服务框架
class win32test(win32serviceutil.ServiceFramework): _svc_name_ = "socketserver"
_svc_display_name_ = "socketserver_display"
_svc_description_ = "socketserver_disc"
def __init__(self, args):
self.run = True
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
def SvcDoRun(self):
while self.run:
# 调用用户程序
sock_server()
win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
self.run =False
if __name__=='__main__':
if len(sys.argv) == 1:
try:
servicemanager.Initialize()
servicemanager.PrepareToHostSingle(win32test)
servicemanager.StartServiceCtrlDispatcher()
except:
win32serviceutil.usage() else:
win32serviceutil.HandleCommandLine(win32test)

客户端:

#!/usr/bin/env python3.5
# -*-coding:utf8-*-
"""
本实例客户端用于不断接收不定长数据,存储到变量res
"""
import socket,time,hashlib,json
ip_port = ('192.168.1.189',1888)
sk = socket.socket()
sk.connect(ip_port)
sk.setblocking(0) # 非阻塞模式,当接收没有发现任何数据时出异常 while True:
user_input=input("cmd>>:").strip()
# 重新初始化加密函数
hash = hashlib.sha512()
if len(user_input) ==0:continue
if user_input =="q":break
hash.update(bytes(user_input,encoding="utf8"))
hash_pwd = hash.hexdigest()
# print(hash_pwd) # 打印生成的加密函数
sk.send(bytes(hash_pwd,'utf8'))
res = ""
while True:
try:
time.sleep(0.1)
time1 = time.time()
server_replay = sk.recv(8000).decode()
res += str(server_replay)
except BlockingIOError:
time2 = time.time()
print("接收数据完成,耗时:%s秒" %(time2-time1))
break
result = res.split("\r\n")
try :
for i in result:
xx = json.loads(i,"utf8")
print(xx)
except Exception:
pass
res = "" sk.close()