上下文管理、线程池、redis订阅和发布

时间:2024-01-01 19:54:51

一:上下文管理:

对于一些对象在使用之后,需要关闭操作的。比如说:socket、mysql数据库连接、文件句柄等。

都可以用上下文来管理。

语法结构:

     Typical usage:

         @contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup> This makes this: with some_generator(<arguments>) as <variable>:
<body>

code:

 import socket
import contextlib @contextlib.contextmanager
def sock_server(host,port):
sk=socket.socket()
sk.bind((host,port))
sk.listen()
try:
yield sk
finally:
sk.close() with sock_server("127.0.0.1",) as soc:
print(soc)

执行顺序:

aaarticlea/png;base64," alt="" />

解释:python从上到下依次解释:

1、当到with的时候,执行with内socket_server("127.0.0.1",),跳到

2、被contextlib.contextmanager装饰的函数。

3、依次执行函数socket_server到yield 并把sk返回给第4步的sco变量

4、然后执行with下面的代码块,执行print语句。

5、当with语句的代码块执行完。跳到第3步的yeild。

6、执行finally语句里的代码块。

二:线程池(threadpool)

自己版本:

 #!/bin/env python
#author:evil_liu
#date:--
#description: thread pool import threading
import time
import queue class Thread_Poll:
'''
功能:该类主要实现多线程,以及线程复用。
'''
def __init__(self,task_num,max_size):
'''
功能:该函数是初始化线程池对象。
:param task_num: 任务数量。
:param max_size: 线程数量。
:return:无。
'''
self.task_num=task_num
self.max_size=max_size
self.q=queue.Queue(task_num)#设置任务队列的。
self.thread_list=[]
self.res_q=queue.Queue()#设置结果队列。 def run(self,func,i,call_back=None):
'''
功能:该函数是线程池运行主函数。
:param func: 传入任务主函数。
:param *args: 任务函数参数,需要是元组形式。
:param call_back: 回调函数。
:return: 无。
'''
if len(self.thread_list)<self.max_size:#如果目前线程数小于我们定义的线程的个数,进行创建。
self.creat_thread()
misson=(func,i,call_back)#往任务队列放任务。
self.q.put(misson) def creat_thread(self):
'''
功能:该函数主要是创建线程,并调用call方法。
:return: 无。
'''
t=threading.Thread(target=self.call)#创建线程
t.start() def call(self):
'''
功能:该函数是线程循环执行任务函数。
:return: 无。
'''
cur_thread=threading.currentThread
self.thread_list.append(cur_thread)
event=self.q.get()
while True:
func,args,cal_ba=event#获取任务函数。
try:
res=func(*args)#执行任务函数。注意参数形式是元组形式。
flag="OK"
except Exception as e:
print(e)
res=False
flag="fail"
self.res(res,flag)#调用回调函数,将执行结果返回到队列中。
try:
event=self.q.get(timeout=)#如果任务队列为空,获取任务超时2s超过2s线程停止执行任务,并退出。
except Exception:
self.thread_list.remove(cur_thread)
break
def res(self,res,status):
'''
功能:该方法主要是将执行结果方法队列中。
:param res: 任务函数的执行结果。
:param status: 执行任务函数的结果,成功还是失败。
:return: 无。
'''
da_res=(res,status)
self.res_q.put(da_res) def task(x,y):
'''
功能:该函数主要需要执行函数。
:param x: 参数。
:return: 返回值1,表示执行成功。
'''
print(x)
return x+y
def wri_fil(x):
'''
功能:该函数主要讲结果队列中的结果写入文件中。
:param x: 任务长度。
:return: 无。
'''
while True:#将执行结果,从队列中获取结果并将结果写入文件中。
time.sleep()
if pool.res_q.qsize()==x:#当队列当前的长度等于任务执行次数,表示任务执行完成。
with open('1.txt','w') as f1:
for i in range(pool.res_q.qsize()):
try:
data=pool.res_q.get(timeout=)
f1.write('mission result:%s,status:%s\n'%data)
except Exception:
break
break
else:
continue
if __name__ == '__main__':
pool=Thread_Poll(,)#初始化线程池对象。
for i in range():#循环任务。
pool.run(task,(,))
wri_fil()

老师版本:注意老师在创建线程的时候,如果此时任务队列中没有任务的时候,不会创建其他线程。在线程执行完任务之后,将线程加入空闲线程的列表中,然后让当前线程去队列里获取任务,利用queue里的get()方法阻塞的作用的,如果一直阻塞的话,

然后表示空闲的列表中的加入的线程 一直有,此时表示创建线程数已经满足任务需求,如果不阻塞则空闲线程列表里没有空余线程。而是获取任务,执行任务。

 #!/usr/bin/env python
# -*- coding:utf- -*- import queue
import threading
import contextlib
import time StopEvent = object() class ThreadPool(object): def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = [] def run(self, func, args, callback=None):
"""
线程池执行一个任务
:param func: 任务函数
:param args: 任务函数所需参数
:param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;、任务函数返回值(默认为None,即:不执行回调函数)
:return: 如果线程池已经终止,则返回True否则None
"""
if self.cancel:
return
if len(self.free_list) == and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w) def generate_thread(self):
"""
创建一个线程
"""
t = threading.Thread(target=self.call)
t.start() def call(self):
"""
循环去获取任务函数并执行任务函数
"""
current_thread = threading.currentThread()
self.generate_list.append(current_thread) event = self.q.get()
while event != StopEvent: func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None if callback is not None:
try:
callback(success, result)
except Exception as e:
pass with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else: self.generate_list.remove(current_thread) def close(self):
"""
执行完所有的任务后,所有线程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= def terminate(self):
"""
无论是否还有任务,终止线程
"""
self.terminal = True while self.generate_list:
self.q.put(StopEvent) self.q.queue.clear() @contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于记录线程中正在等待的线程数
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread) # How to use pool = ThreadPool() def callback(status, result):
# status, execute action status
# result, execute action return value
pass def action(i):
print(i) for i in range():
ret = pool.run(action, (i,), callback) time.sleep()
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()