Python 并发编程

时间:2023-03-09 04:07:56
Python 并发编程

进程

开启进程

from multiprocessing import Process
import time def task(name):
print('%s is running' %name)
time.sleep(2) #在windows系统下,开子进程的代码必须写到这一行下面
if __name__ == '__main__':
p=Process(target=task,args=('egon',)) # 设置进程的任务及参数
p.start() #只是在给操作系统发了一个信号,让操作系统去开进程(申请内存+拷贝父进程的地址空间)
print('主') #主
#egon is running

第二种开启进程的方式

from multiprocessing import Process
import time
# 通过继承Process类来开启进程
class Myprocess(Process):
def __init__(self,name): # 初始化,主要是为了传递一些参数
super().__init__()
self.name=name
def run(self): # 执行的任务
time.sleep(3)
print('%s is running' % self.name)
time.sleep(2) # 在windows系统下,开子进程的代码必须写到这一行下面
if __name__ == '__main__':
p = Myprocess('egon')
p.start() # p.run()
print('主') #主
#egon is running

Process对象的常用方法

start() : 用于开启一个子进程,上面可以看到

join() : 等待子进程完成 , 前面代码之所以先打印主, 是因为开启进程时加载资源消耗时间

from multiprocessing import Process
import time
#
class Myprocess(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
time.sleep(3)
print('%s is running' % self.name)
time.sleep(2) if __name__ == '__main__': # 在windows系统下,开子进程的代码必须写到这一行下面
p = Myprocess('egon')
p.start() # p.run()
p.join()
print('主') #egon is running
#主 #p.join([timeout])
#主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,
#需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
#在上面代码中等待子进程结束的时候,p1,p2两个进程是同时运行的
#如果staer后紧跟join则会等待子进程完成后才会执行就失去了并发的功能

terminate()

terminate是给操作系统发信号,让操作系统去关闭进程 #然而这并没有什么卵用,强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

#terminate()
from multiprocessing import Process
import time,random
def func(name):
print('%s is runing' %name)
time.sleep(2)
print('%s is done' %name) if __name__ == '__main__':
p1 = Process(target=func,args=("one",))
p1.start()
p1.terminate()
p1.join()
print("hahaha") #执行结果
#hahaha

is_alive()

is_alive是判断子进程是否在运行, 运行返回True,不运行返回Flase

from multiprocessing import Process
import time,random
def func(name):
print('%s is runing' %name)
time.sleep(2)
print('%s is done' %name) if __name__ == '__main__':
p1 = Process(target=func,args=("one",))
p1.start()
print(p1.is_alive())
p1.join()
print("hahaha") #执行结果
#True
#one is runing
#one is done
#hahaha

name pid

name是Process对象的一个属性,由默认值,也可以自己指定

pid 进程ID

def piao(name):
print('%s is wuwuwa' % name) if __name__ == '__main__':
p1=Process(target=piao,args=('alex',))
p2=Process(target=piao,args=('alex',))
p3=Process(target=piao,args=('alex',),name="hahaha") p1.start()
print(p1.name)
print(p2.name)
print(p3.name) print(p1.pid) print('主')
#运行结果
#Process-1
#Process-2
#hahaha
#1468
#主
#alex is wuwuwa

进程池

操作系统不可能无线开启进程,进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

我们就可以通过维护一个进程池来控制进程数目

创建一个进程池

from concurrent.futures import ProcessPoolExecutor
import time,os def piao(name,n):
print("%s is piaoing %s"%(name,os.getpid())) # 打印了进程id
time.sleep(1) if __name__ == "__main__":
p = ProcessPoolExecutor(4) # 指定进程池最大进程个数
for i in range(10):
obj = p.submit(piao,"alex %s"%i,i) # 提交任务,去开启子进程 #运行结果
#alex 0 is piaoing 6136
#alex 1 is piaoing 8872
#alex 2 is piaoing 420
#alex 3 is piaoing 5644 #alex 4 is piaoing 6136
#alex 5 is piaoing 8872
#alex 6 is piaoing 420
#alex 7 is piaoing 5644 #alex 8 is piaoing 6136
#alex 9 is piaoing 8872
#进程ID始终就是四个,另外注意,这里写的都是停一秒,所以他们每次都运行完四个。
#事实上进程池是有一个进程执行完之后,就有一个进程补进来

当有操作需要在子进程都结束之后执行时,就会需要一个类似于join()的操作

from concurrent.futures import ProcessPoolExecutor
import time,os,random def piao(name,n):
print("%s is piaoing %s"%(name,os.getpid())) # 打印了端口号
time.sleep(random.randint(2,3)) if __name__ == "__main__":
p = ProcessPoolExecutor(4) # 指定进程池最大进程个数
for i in range(10):
obj = p.submit(piao,"alex %s"%i,i) # 提交子进程 p.shutdown(wait=True) # 在没有这句话的情况下,肯定会先执行下面的操作
# 这句话的意思是进程池中不会再添加进程,等到进程池中的进程运行结束后执行下面的代码
print("哈哈哈哈")
#运行结果 #alex 0 is piaoing 12000
#alex 1 is piaoing 904
#alex 2 is piaoing 6120
#alex 3 is piaoing 8328 #alex 4 is piaoing 904
#alex 5 is piaoing 8328 #alex 6 is piaoing 12000
#alex 7 is piaoing 6120 #alex 8 is piaoing 904
#alex 9 is piaoing 8328
#哈哈哈哈

子进程有返回值-result()

from concurrent.futures import ProcessPoolExecutor
import time,os,random def piao(name,n):
print("%s is piaoing %s"%(name,os.getpid())) # 打印了端口号
time.sleep(random.randint(2,3))
return n*2
if __name__ == "__main__":
p = ProcessPoolExecutor(4) # 指定进程池最大进程个数
objs = []
for i in range(10):
# 这属于同步调用,要等到obj拿到结果后才会执行之后的代码
# obj = p.submit(piao,"alex %s"%i,i).result()
# print(obj)
# 异步调用
obj = p.submit(piao,"alex %s"%i,i) # 这里只提交进程,并不拿到他们的结果,并且把进程赋值给一个变量
objs.append(obj) # 把进程追加进列表中
for obj in objs:
print(obj.result()) # 从列表中拿到进程的返回值
# 这里提交进程时并不会遇到阻塞,进程池中的四个进程是同时运行的,一个进程运行完,就会有另一个进程开始运行
# 在拿结果的时候可能有一个没有运行完但他后面的就运行完了,他不会跳过去取结果,会等待结果出来,这时后面的
# 结果是计算出来的所以不用等待就能拿到结果 p.shutdown(wait=True)
print("哈哈哈哈")

守护进程

由主进程开启守护进程,主进程结束后守护进程随之结束

from multiprocessing import Process
import os,time,random def task():
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid()) if __name__ == '__main__':
p=Process(target=task)
p.daemon = True #1、必须在p.start()之前 2:守护进程不能开启子进程
p.start()
# p.join()
print('主')
#结果
#主

需要注意的是 :

  • 守护进程会在主进程代码执行结束后就终止
  • 守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

竞争带来的结果就是错乱,如何控制,就是加锁处理

并发造成的问题

#并发运行,效率高,但竞争同一打印终端,带来了打印错乱
from multiprocessing import Process,Lock
import os,time,random def task():
print("111111")
time.sleep(2)
print("222222")
time.sleep(2)
print("333333") if __name__ == "__main__":
mutex = Lock()
p1 = Process(target=task)
p2 = Process(target=task)
p3 = Process(target=task)
p1.start()
p2.start()
p3.start()
#运行结果
#111111
#111111
#111111
#222222
#222222
#222222
#333333
#333333
#333333

通过加锁避免该问题

from multiprocessing import Process,Lock
import os,time,random def task(mutex):
mutex.acquire() # 获得锁
print("111111")
time.sleep(2)
print("222222")
time.sleep(2)
print("333333")
mutex.release() # 释放锁 if __name__ == "__main__":
mutex = Lock()
p1 = Process(target=task,args=(mutex,)) # 将锁对象传入任务重
p2 = Process(target=task,args=(mutex,))
p3 = Process(target=task,args=(mutex,))
p1.start()
p2.start()
p3.start()
#运行结果
#111111
#222222
#333333
#111111
#222222
#333333
#111111
#222222
#333333

互斥锁与join的区别

join是等一个进程运行完后下一个进程才运行,而互斥锁是进程都执行了,不过这个任务每次都只能有一个进程执行,谁抢到谁运行。

且join是全局的,一个进程没完不会开启下一个进程,互斥锁可以局部进行,专门为某一个任务加锁。

买火车票

def search():
with open('db.txt',encoding='utf-8') as f:
dic=json.load(f)
print('%s 剩余票数 %s' %(os.getpid(),dic['count'])) def get():
with open('db.txt',encoding='utf-8') as read_f:
dic=json.load(read_f) if dic['count'] > 0:
dic['count']-=1
time.sleep(random.randint(1,3)) #模拟手速+网速
with open('db.txt','w',encoding='utf-8') as write_f:
json.dump(dic,write_f)
print('%s 抢票成功' %os.getpid()) def task(mutex):
search()
mutex.acquire()
get() # 只为买票加锁
mutex.release() if __name__ == '__main__':
mutex=Lock()
for i in range(20):
p=Process(target=task,args=(mutex,))
p.start()
#结果-并不影响查票,但买票只能一个一个买
10356 剩余票数 1
15436 剩余票数 1
10008 剩余票数 1
14496 剩余票数 1
5668 剩余票数 1
15884 剩余票数 1
7288 剩余票数 1
7308 剩余票数 1
15740 剩余票数 1
152 剩余票数 1
12260 剩余票数 1
14988 剩余票数 1
15396 剩余票数 1
15696 剩余票数 1
1036 剩余票数 1
7632 剩余票数 1
15392 剩余票数 1
14600 剩余票数 1
12408 剩余票数 1
16168 剩余票数 1
10356 抢票成功

线程

线程是CPU的执行单位,一个进程内默认就会有一个控制线程,该控制线程可以执行代码从而创建新的线程

多线程是在一个进程中开启多个线程,线程之间资源共享。这些线程共享一个进程的地址空间,也共享一个进程中的资源。这些线程的关系是并列的,一个线程修改进程的参数,其他进程都会受影响。

开启线程

方式一

from threading import Thread
import time,os def task():
print('%s is running' %os.getpid())#os.getpid()进程id
time.sleep(5)
print('%s is done' %os.getpid()) if __name__ == '__main__':
t=Thread(target=task,)#函数名,元组式的参数
t.start() print('主')
#运行结果
#580 is running
#主
#580 is done #开启子线程和开启子进程方法都是一样的,只是导入的模块不同
#使用子进程执行上面的代码会先打印“主”
#而使用子线程会先打印“580 is running”说明了开启线程是比开启进程要快的

继承的方式

from threading import Thread
import time,os class Mythread(Thread):
def __init__(self):
super().__init__() def run(self):
print('%s is running' % os.getpid())
time.sleep(5)
print('%s is done' % os.getpid()) if __name__ == '__main__':
t=Mythread()
t.start()

其他方法

from threading import Thread,current_thread,enumerate,active_count
import time def task():
print('%s is running' %current_thread().getName())
time.sleep(5)
print('%s is done' %current_thread().getName()) if __name__ == '__main__':
t=Thread(target=task,name='xxxx') #可以为进程指定名字不指定默认从Thread-1开始
t.start() #查看当前活着的线程
print(enumerate()) #返回活着进程信息的列表
print(active_count()) # 查看进程数
print('主',current_thread().getName()) # 返回进程名字 #运行结果 #xxxx is running
#[<_MainThread(MainThread, started 11784)>, <Thread(xxxx, started 5700)>]
#2
#主 MainThread
#xxxx is done

同样,多线程中也存在start(), 和join()

线程池

同样一个进程中不可能无限开辟线程,于是就有了线程池,线程池的作用与进程池相似,是规定了开辟最大线程数 --  也可以通过result()方法拿到返回值

from concurrent.futures import ThreadPoolExecutor
import time,os,random
def piao(name,n):
print("%s is piaoing %s"%(name,os.getpid())) # 打印了进程id
time.sleep(random.randint(1,3)) if __name__ == "__main__":
p = ThreadPoolExecutor(4) # 指定线程池最大线程个数,不包含控制线程
for i in range(10):
obj = p.submit(piao,"alex %s"%i,i) # 提交线程
print("主") #是不是和开启子进程方法一模一样,只是名字不同 #执行结果就像之前说的那样,开启线程要比开启进程快的多,所以控制线程的结果在子线程之后打印出来
#执行结果
#alex 0 is piaoing 8764
#alex 1 is piaoing 8764
#alex 2 is piaoing 8764
#alex 3 is piaoing 8764
#主 #alex 4 is piaoing 8764
#alex 5 is piaoing 8764
#alex 6 is piaoing 8764
#alex 7 is piaoing 8764 #alex 8 is piaoing 8764 #alex 9 is piaoing 8764

守护线程

守护控制线程

from threading import Thread
import os,time,random def task():
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
# p = Thread(target=time.sleep, args=(3,))
# p.start() if __name__ == '__main__':
p=Thread(target=task)
p.daemon = True #1、必须在p.start()之前 2:守护线程可以开启子线程
p.start()
print('主') #运行结果
#6120 is running
#主

ps : 守护线程是可以开启子线程的

from threading import Thread
import os,time,random def task():
print('%s is running' %os.getpid())
time.sleep(2)
print('%s is done' %os.getpid())
p = Thread(target=print, args=("子线程中的子线程",))
p.start() if __name__ == '__main__':
p=Thread(target=task)
p.daemon = True #1、必须在p.start()之前 2:守护进程不能开启子进程
p.start()
p.join()
print('主') #结果
#14368 is running
#14368 is done
#子线程中的子线程
#主

线程之间数据共享,如果多个线程同时修改数据,会产生意外的结果

from threading import Thread,Lock
import time
n=100 def task():
global n
temp=n
time.sleep(0.1) #模仿处理时间
n=temp-1 if __name__ == '__main__':
# mutex=Lock()
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start()
for t in t_l:
t.join()
print(n)
# 运行结果-他们几乎是同时拿到了数据,进行处理,但拿到的是相同的数据
99

加锁方式

from threading import Thread,Lock
import time
n=100 def task(): #可以不传递mutex,线程共享资源,而进程必须要传过去
global n
mutex.acquire()
temp=n
time.sleep(0.1)
n=temp-1
mutex.release()
  #另外一种写法
  # global n
  # with mutex: 线程锁对象支持with语法来获取锁和释放锁
  # temp=n
  # time.sleep(0.1)
  # n=temp-1 if __name__ == '__main__':
mutex=Lock()
t_l=[]
for i in range(100):
t=Thread(target=task)
t_l.append(t)
t.start() for t in t_l:
t.join()
print(n)
#执行结果-进行加锁,使他们不能同时拿到数据,只有在一个进程结束之后才可以拿
#0

  

协程

cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长.

我们可以在代码级别进行保存状态和切换, 利用回调来执行IO操作. 尽可能的占用CPU资源.

如利用yield send来保存切换和保存状态

#串行执行
import time
def consumer(res):
'''任务1:接收数据,处理数据'''
pass def producer():
'''任务2:生产数据'''
res = 0
for i in range(10000000):
res +=i
return res start=time.time()
#串行执行
res=producer()
consumer(res) #写成consumer(producer())会降低执行效率
stop=time.time()
print(stop-start) #0.571678876876831 #基于yield并发执行
import time
def consumer():
'''任务1:接收数据,处理数据'''
y = 0
while True:
x =yield
y+=x def producer():
'''任务2:生产数据'''
g=consumer()
next(g) # 激活, 把第一次yield的值接收,也可以send(None)
for i in range(10000000):
g.send(i)
start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer() stop=time.time()
print(stop-start) #1.5842673778533936

协程的优缺点

优点如下:
#1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
#2. 单线程内就可以实现并发的效果,最大限度地利用cpu 缺点如下
#1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
#2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程

协程模块

from gevent import monkey;monkey.patch_all()
import gevent
import time
# gevent本身只支持切换自身IO阻塞,需要导入monkey;monkey.patch_all()来实现遇到IO切,一定要写在最前面,意思是支持切换下面的IO操作。 def eat(name):
print('%s eat 1' %name)
time.sleep(2)
print('%s eat 2' %name) def play(name):
print('%s play 1' %name)
time.sleep(3)
print('%s play 2' %name) g1=gevent.spawn(eat,'alex')
g2=gevent.spawn(play,'egon') g1.join()
g2.join() # join是等待协程完成,这里如果不使用join主线程执行完毕这个进程就结束了,不会打印内容
#上面两个join可写为gevent.joinall([g1,g2]) #元组列表都可以
#alex eat 1
#egon play 1
#alex eat 2
#egon play 2

GIL锁

全局解释器锁(Global Interpreter Lock)是计算机程序设计语言解释器用于同步线程的工具,使得任何时刻仅有一个线程在执行。

全局解释器锁的本质是互斥锁,不过套在python解释器(一个进程)上的,这就使得同一时刻只有一个线程拿到python解释器的执行权限,就是同一时刻只有一个线程在执行,而Lock是套在代码段上,使得同一时刻只有一个线程来使用这个代码的功能。

在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,也就是说python没用了?

#分析:
我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程 #单核情况下,分析结果:
  如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
  如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜 #多核情况下,分析结果:
  如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
  如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜 #结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如
串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。

进程之间通信的方式

无名管道( pipe ): - 管道是一种半双工的通信方式,数据只能单向流动,而且只能在具有亲缘关系的进程间使用。进程的亲缘关系通常是指父子进程关系。
高级管道(popen):  - 将另一个程序当做一个新的进程在当前程序进程中启动,则它算是当前程序的子进程,这种方式我们成为高级管道方式。
有名管道(named pipe) : - 有名管道也是半双工的通信方式,但是它允许无亲缘关系进程间的通信。
消息队列( message queue ) : - 消息队列是由消息的链表,存放在内核中并由消息队列标识符标识。消息队列克服了信号传递信息少、管道只能承载无格式字节流以及缓冲区大小受限等缺点。
信号量( semophore ) :- 信号量是一个计数器,可以用来控制多个进程对共享资源的访问。它常作为一种锁机制,防止某进程正在访问共享资源时,其他进程也访问该资源。因此,主要作为进程间以及同一进程内不同线程之间的同步手段。
信号 ( sinal ) : - 信号是一种比较复杂的通信方式,用于通知接收进程某个事件已经发生。
共享内存( shared memory ) : - 共享内存就是映射一段能被其他进程所访问的内存,这段共享内存由一个进程创建,但多个进程都可以访问。共享内存是最快的 IPC 方式,它是针对其他进程间通信方式运行效率低而专门设计的。它往往与其他通信机制,如信号两,配合使用,来实现进程间的同步和通信。
套接字( socket ):- 套解口也是一种进程间通信机制,与其他通信机制不同的是,它可用于不同机器间的进程通信。

基于队列实现的生产者消费者模型

基础模型

from multiprocessing import Process,Queue
import time,random,os def procducer(q):
for i in range(10):
res='包子%s' %i
time.sleep(0.5)
q.put(res) # 放值
print('%s 生产了 %s' %(os.getpid(),res)) def consumer(q):
while True:
res=q.get() # 取值
print('%s 吃 %s' %(os.getpid(),res))
time.sleep(random.randint(2,3)) if __name__ == '__main__':
q=Queue()
p=Process(target=procducer,args=(q,)) # 给两个进程传递一个队列
c=Process(target=consumer,args=(q,)) p.start()
c.start()
print('主')

  

此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

生产者发送信号

from multiprocessing import Process,Queue
import time,random,os def procducer(q):
for i in range(10):
res='包子%s' %i
time.sleep(0.5)
q.put(res)
print('%s 生产了 %s' %(os.getpid(),res))
q.put(None) # 生产完毕 def consumer(q):
while True:
res=q.get()
if res is None:
break
print('%s 吃 %s' %(os.getpid(),res))
time.sleep(random.randint(2,3)) if __name__ == '__main__':
q=Queue()
p=Process(target=procducer,args=(q,))
c=Process(target=consumer,args=(q,)) p.start()
c.start()
print('主')

主进程发送信号

from multiprocessing import Process,Queue
import time,random,os def procducer(q):
for i in range(10):
res='包子%s' %i
time.sleep(0.5)
q.put(res)
print('%s 生产了 %s' %(os.getpid(),res)) def consumer(q):
while True:
res=q.get()
if res is None:
break
print('%s 吃 %s' %(os.getpid(),res))
time.sleep(random.randint(2,3)) if __name__ == '__main__':
q=Queue()
p=Process(target=procducer,args=(q,))
c=Process(target=consumer,args=(q,)) p.start()
c.start() p.join()
q.put(None)
print('主')

  注意:不能以队列是否为空来判断消费者是否结束,因为可能生产者没有及时生产,队列开始时为空。

JoinableQueue队列

JoinableQueue 比Queue多了task_done() 与join()两个函数

task_done() 是用在get()后,告诉os, 我get完了,

join()是说Queue里所有的items都被拿出来搞完了。

  put时有一个计数器,在task_done的次数等于put次数时,生产者才会结束。join就是等待生产者结束。可以将消费者设为守护进程,因为生产者结束就意味着消费者已经把生产者生产的数据取完了。让消费者不必在等待

from multiprocessing import Process,Queue,JoinableQueue
import time,random,os def procducer(food,q):
for i in range(3):
res='%s%s' %(food,i)
time.sleep(0.5)
q.put(res)
print('%s 生产了 %s' %(os.getpid(),res))
q.join() # 等待消费完毕时执行完毕 def consumer(q):
while True:
res=q.get()
print('%s 吃 %s' %(os.getpid(),res))
time.sleep(random.randint(2,3))
q.task_done() # 消费一个 if __name__ == '__main__':
q=JoinableQueue()
p1=Process(target=procducer,args=('包子',q,)) # 生产包子的生产者
p2=Process(target=procducer,args=('饺子',q,)) # 生产饺子的生产者
p3=Process(target=procducer,args=('面条',q,)) # 生产面条的生产者
c1=Process(target=consumer,args=(q,)) # 两个守护进程的消费者
c2=Process(target=consumer,args=(q,)) c1.daemon=True
c2.daemon=True p1.start()
p2.start()
p3.start()
c1.start()
c2.start() p1.join()
p2.join()
p3.join()

信号量

信号量规定了能够同时进行这个功能的进程或线程的个数。有些类似锁(锁是只有一把钥匙, 这个可以设置钥匙数量)

进程间的信号量

from multiprocessing import Process,Semaphore
import time,random,os def task(sm):
with sm:
print('%s 上厕所' %os.getpid())
time.sleep(random.randint(1,3))
#相当于
#sm.acquire()
#print('%s 上厕所' % os.getpid())
#time.sleep(random.randint(1,3))
#sm.release() if __name__ == '__main__':
sm=Semaphore(3)
for i in range(10):
p=Process(target=task,args=(sm,))
p.start()
#执行结果
3756 上厕所
12456 上厕所
3668 上厕所 8756 上厕所
14836 上厕所
8332 上厕所 13508 上厕所
10332 上厕所 15944 上厕所
12044 上厕所

线程间的信号量

from threading import Thread,Semaphore
import time,random,os,current_thread def task(sm):
with sm:
print('%s 上厕所' %current_thread().getName())
time.sleep(random.randint(1,3))
if __name__ == '__main__':
sm=Semaphore(3)
for i in range(10):
p=Thread(target=task,args=(sm,))
p.start()
#结果
Thread-1 上厕所
Thread-2 上厕所
Thread-3 上厕所 Thread-4 上厕所 Thread-5 上厕所 Thread-6 上厕所
Thread-7 上厕所 Thread-8 上厕所 Thread-9 上厕所 Thread-10 上厕所

进程与线程的回调函数

需要回调函数的场景:进(线)程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进(线)程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进(线)程池中,然后指定回调函数,这样主进(线)程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

进程回调

进程中的回调函数由主进程来完成

from concurrent.futures import ProcessPoolExecutor
from threading import current_thread
import time,random,os
def task(n):
print('%s is running in %s' %(current_thread().getName(),os.getpid())) #打印线程名字,进程id
time.sleep(2)
return n**2
def func(n): print(str(n.result())+"执行完啦","由%s执行的"%os.getpid()) #打印进程id
time.sleep(2) if __name__ == '__main__':
start = time.time()
t=ProcessPoolExecutor(3) #默认是cpu的核数*5
for i in range(5):
t.submit(task,i).add_done_callback(func) # 指定回调函数
t.shutdown(wait=True)
end = time.time()
print(end - start)
print('主',current_thread()) # 运行结果 回调函数都是同一个进程id MainThread is running in 3504
MainThread is running in 14188
MainThread is running in 13180
MainThread is running in 3504
0执行完啦 由9616执行的
1执行完啦 由9616执行的
MainThread is running in 14188
4执行完啦 由9616执行的
9执行完啦 由9616执行的
16执行完啦 由9616执行的
12.142156839370728
主 <_MainThread(MainThread, started 11004)>

线程回调

线程中的回调函数不由控制线程完成

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from threading import current_thread,enumerate,active_count
import time,random
def task(n):
print('%d is running in %s' %(i,current_thread().getName())) #打印线程名字
time.sleep(2)
return n**2
def func(obj): # 模仿处理返回值。拿到结果后停两秒,相当于处理结果
print(obj.result(), current_thread().getName()) # 打印线程名字
time.sleep(2) if __name__ == '__main__':
start = time.time()
t=ThreadPoolExecutor(3) #默认是cpu的核数*5
for i in range(5):
t.submit(task, i).add_done_callback(func) # print(obj.result())
end = time.time()
print(end-start)
print('主',current_thread()) # 不会在控制线程中运行
0 is running in ThreadPoolExecutor-0_0
1 is running in ThreadPoolExecutor-0_1
2 is running in ThreadPoolExecutor-0_2
0.0009913444519042969
主 <_MainThread(MainThread, started 6812)>
0 ThreadPoolExecutor-0_0
1 ThreadPoolExecutor-0_1
4 ThreadPoolExecutor-0_2
4 is running in ThreadPoolExecutor-0_0
4 is running in ThreadPoolExecutor-0_1
9 ThreadPoolExecutor-0_0
16 ThreadPoolExecutor-