多进程 multiprocessing 多线程Threading 线程池和进程池concurrent.futures

时间:2022-01-04 21:34:53
multiprocessing.procsess
定义一个函数
def func():pass
在if __name__=="__main__":中实例化
p = process(target=子进程要执行的函数,args(函数的参数且必须以元组的方式传参))
p.start() 开启子进程
p.join() 感知子进程的结束,主进程等待子进程执行完后才退出
p.terminate() 结束一个子进程
p.is_alive() 查看某个进程是否还在运行 属性
p.name p.pid 进程名和进程号
p.deamon 守护进程,主进程的代码执行完,子进程也随即结束,在子进程开始执行前设置
用类开启子进程的方式,必须有run()
Myprocsess(procsess):
def __init__(name,*args,**kwargs):pass
def run():pass
p = Myprocsess('name') **在子进程中不要出现input
# lock semaphore event queue joinableQueue 对象在主进程创建 multiprocessing.Lock
实例化 锁 对数据的一种保护方式,一次只能一个进程对数据进行操作
l = Lock()
l.acquire() 拿钥匙 加锁,其他进程不能访问
l.release() 还钥匙 释放锁,释放了之后,其他进程才能拿锁对数据进行操作
其余进程阻塞等待 multiprocessing.semaphore
实例化信号量,传入同时允许访问的进程的数量
sem = semaphore(4)
sem.acquire() 获取钥匙,同时允许4个进程访问
sem.release() 还钥匙,让其他进程得以访问 multiprocessing.Event
通过一个信号来控制多个进程,一个信号可以让所有的进程都阻塞,也可以解除所有进程的阻塞
e = Event()
e.is_set() 查看一个事件的状态,默认被设置成False阻塞,True为不阻塞
e.set() 修改一个事件的状态为Ture
e.clear() 修改一个事件的状态为Flase
e.wait() 是依据事件的状态来决定是否阻塞,Flase阻塞,True不阻塞 multiprocessing.Queue
队列 先进先出
q = Queue(队列数)
q.put() 放一个进队列
q.get() 从队列中取一个出来
q.full() 队列是否满了
q.empty() 队列是否空了 multiprocessing.JoinableQueue
q = JoinableQueue(20)
q.put() 放入队列,count +1
q.join() # 阻塞 直到一个队列中的所有数据 全部被处理完毕
q.get() 取消息
q.task_done() # count -1 在处理完get到的消息后,处理完毕后告诉队列 # 队列中只能存放实例化时设置的参数的个数,队列未满才会继续put
# 使用JoinableQueue,get()后用.task_done(),这样队列put端put后的join将在最后一次调用task_done后不阻塞 multiprocessing.Pipe
con,pro = Pipe() # 数据不安全性,加锁
process(func,args=(con,pro))
con.recv()
con.close()
pro.send(f)
pro.close()
# con,pro不用send或recv就close,只留一个recv取完数据将会引发EOFError异常
multiprocessing.Manager
m = Manager()
dic=m.dict({'count':100})
p = Process(target=func,args=(dic,l))
# manager数据间的共享,不同子进程对数据修改,主进程可拿值,不安全,加锁 multiprocessing.Pool
p = Pool(5)
p.map(func,iterable) # 默认的异步执行任务,切自带close和join功能 p.Pool(5)
p.apply_async(func,args=(i,)) # 异步调用 和主进程完全异步
p.close() # 结束进程池接收任务
p.join() # 感知进程池中的任务执行结束 res = p.apply_async(func,args=(i,)) # 返回对象
对象.get() # 阻塞等待结果 func对象的计算返回结果
ret = p.map(func,iterable) # 等待所有的进程执行完,将func的计算结果用列表返回 p.apply_async(func,args=(i,),callback=func2) # 回调函数是在主进程执行
# 回调函数,不能接收参数,只能将func的执行结果作为参数传给回调函数
多线程Threading
# 多线程和多进程的使用类似。
# 进程 是 最小的 内存分配单位
# 线程 是 操作系统调度的最小单位
# 线程直接被CPU执行,进程内至少含有一个线程,也可以开启多个线程
# 开启一个线程所需要的时间要远远小于开启一个进程
# 多个线程内部有自己的数据栈,数据不共享
# 全局变量在多个线程之间是共享的
# GIL锁(即全局解释器锁)
# 在Cpython解释器下的python程序 在同一时刻 多个线程中只能有一个线程被CPU执行
# 高CPU : 计算类 --- 高CPU利用率
# 高IO : 爬取网页 200个网页 # qq聊天 send recv # 处理日志文件 读文件 # 处理web请求 # 读数据库 写数据库
threading.Thread(target=wahaha,args=(i,)).start()
print(threading.active_count())
print(threading.current_thread())
print(threading.enumerate()) threading.Condition
con = Condition() # 条件 # 一个条件被创建之初 默认有一个False状态 # False状态 会影响wait一直处于等待状态
con.acquire()
con.wait() # 等钥匙
con.notify(num) # 造钥匙
con.release()
# 用完即销毁,notify(数量),条件状态设置为notify(数量)个True,wait()通过一次,就减少一个True threading.Timer # 定时器
t = Timer(5,func).start() # 非阻塞的
# 只是延时执行func函数,异步开启线程
import queue
q = queue.Queue() # 队列 先进先出
q = queue.LifoQueue() # 栈 先进后出
q = queue.PriorityQueue()
q.put((20,'a'))
# 优先级队列,第一个元组元素是优先级,数字越小越优先,优先级相同,消息按ascll排序
# concurrent.futures 线程池和进程池
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
pool = ThreadPoolExecutor(max_workers=5) # 设置进程池中的进程个数
pool.map(func,range(20)) # == for i in range(20):submit(func,i)
# 拿不到返回值,参数 *iterables
p = pool.submit(func,i)
# 返回对象,p.result()取值 添加回调函数 .add_done_callback(call_back) ,回调函数接收参数p对象,用p.result()取值
pool.shutdown() # close+join from gevent import monkey;monkey.patch_all()
import gevent # 协程 本质上是一个线程 # print(threading.current_thread().getName()) # 虚拟线程
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
g1.join()
g2.join() # gevent.joinall(g_lst) # for g in g_lst:g.join()
# 进程和线程的任务切换右操作系统完成
# 协程任务之间的切换由程序(代码)完成,只有遇到协程模块能识别的IO操作的时候,程序才会进行任务切换,实现并发的效果