Python 之并发编程之进程下(事件(Event())、队列(Queue)、生产者与消费者模型、JoinableQueue)

时间:2024-04-03 23:34:02

八:事件(Event())

# 阻塞事件:
    e = Event() 生成事件对象e
    e.wait() 动态给程序加阻塞,程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
    # 如果是True 不加阻塞
    # 如果是False 加阻塞
# 控制这个属性的值
    # set() 方法        将这个属性的值改成True
    # clear() 方法     将这个属性的值改成False
    #is_set() 方法      判断当前属性是否为True  (默认上来是False)

#(1)各方法基本使用

e = Event()
# 获取该对象中成员属性是True 还是False
print(e.is_set())
print(1)
# 用e.set() 把该对象的成员值变成True
e.set() # True
print(e.is_set())
e.wait()
print(2)
# 用e.clear() 把该对象的成员值变成False
e.clear()
print(3)
print(e.is_set())
e.wait()
print(4)

输出结果为:

False

1

True

2

3

False

4 不运行,4之前是False,是加阻塞,所有不运行。

#(2)模拟红绿灯效果

#例:

from multiprocessing import Process, Event
import time, random # traffic_light(e):
def traffic_light(e):
print("红绿灯")
while True: if e.is_set():
#print(e.is_set())
# 当前绿灯等待1秒
time.sleep(1)
# 切换成红灯
print("红灯亮")
e.clear()
#print(e.is_set())
else:
# 为当前红灯等待1秒
time.sleep(1)
# 等完1秒之后,变成绿灯
print("绿灯亮")
e.set() # 小车在遇到红灯时停,绿灯时行
def car(e, i):
if not e.is_set():
print("car %s 在等待" % (i))
# wait获取的值是False ,所有加阻塞
e.wait()
print("car %s 通行了" % (i)) """
# 车跑完了,但是红绿灯仍然执行
if __name__ == "__main__":
e = Event()
# 启动交通灯
p1 = Process(target=traffic_light, args=(e,))
p1.start() for i in range(10):
time.sleep(random.randrange(0, 2))
p2 = Process(target=car, args=(e, i))
p2.start() """ # ###优化版
# 车跑完了,红绿灯这个进程也结束掉
if __name__ == "__main__":
e = Event()
lst = [] #启动交通灯
p1 = Process(target=traffic_light, args=(e,))
p1.daemon = True
p1.start() for i in range(20):
time.sleep(random.randrange(0,2))
p2 = Process(target=car, args=(e,i))
p2.start()
lst.append(p2) # 循环等待每一辆车通过红绿灯
for i in lst:
i.join() print("程序彻底执行结束")

执行结果为:

红绿灯

car 0 在等待

绿灯亮

car 0 通行了

car 1 通行了

car 2 通行了

car 3 通行了

红灯亮

绿灯亮

car 4 通行了

红灯亮

car 5 在等待

绿灯亮

car 5 通行了

car 6 通行了

红灯亮

car 7 在等待

car 8 在等待

绿灯亮

car 7 通行了

car 8 通行了

car 10 通行了

car 9 通行了

car 12 通行了

car 11 通行了

红灯亮

car 13 在等待

绿灯亮

car 13 通行了

car 14 通行了

car 15 通行了

car 16 通行了

car 17 通行了

红灯亮

car 18 在等待

car 19 在等待

绿灯亮

car 19 通行了

car 18 通行了

程序彻底执行结束

运行结果解析:

Car每次通过的数量是不一定的,因为car子进程的产生是根据随机睡眠时间而定的。

如果把注释部分打开,将优化版注释,则是程序将一直执行,车走完了,红绿灯依然那边红绿打印。

九:队列Queue

#先进先出 原则

#(1)基本语法

from multiprocessing import Process,Queue
q = Queue()
# 1.把数据放q队列中 put
q.put(111)

# 2.把数据从队列里面拿出来 get
res = q.get()
print(res)

# 3.当队列里面的值都拿出来,已经没有数据的时候,在获取会阻塞
res = q.get() #直接阻塞
# 4.get_nowait() 无论有没有都拿,如果拿不到,直接报错

'''
get_nowait 内部要依赖queue模块来实现
没有完全优化好,不推荐使用,想用就用put和get分别设置和获取,就可以了
# res = q.get_nowait()  不推荐使用
# print(res)
# try .. except .. 捕捉异常
try:
   print(q.get_nowait())
# 特指queue.Empty 这种错误,执行某些逻辑.
except queue.Empty:
   print("empty")
except:
   print("其他")
'''

# (2)可以使用queue 指定队列的长度
#最多放3个,超过最大长度,就执行阻塞
#例:

from multiprocessing import Process,Queue
q = Queue(3)
q.put(1)
q.put(2)
q.put(3)
# print(q.get())
#q.put(4) #阻塞的情况
# q.put_nowait(4) #超过队列最大长度,在存值直接报错 (不推荐使用) # (了解 不常用 full empty)
# 队列值满了返回真,不满返回假
res = q.full()
print(res)
# 判断队列中是否为空
res = q.empty()
print(res)

输出结果为:

True

False

# (3) 进程之间的通讯

def func(q):
# 1.主进程添加的值,子进程可以通过队列拿到
res = q.get()
print("我是子进程", res)
q.put("a2222") if __name__ == "__main__":
q1 = Queue()
p = Process(target=func, args=(q1,))
p.start() #q1.get() #如果添加这个会阻塞,因为取不到值
# 主进程添加数据
q1.put("a111")
p.join()
# 2.子进程添加的值,主进程通过队列可以拿到
print("主进程:", q1.get())

结果输出:

我是子进程 a111

主进程: a2222

运行结果分析:

输出的主进程a222说明,数据子进程中被改变了,所有进程之间的数据可以改变的。

十. 生产者与消费者模型

(1)为什么要使用生产者消费者模型

  生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

(2)什么是生产者和消费者模式

  生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题,生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的

举例思路:

# 爬虫例子:
1号进程负责获取网页中所有内容
2号进程负责匹配提取网页中的关键字

1号进程就可以看成一个生产者
2号进程就可以看成一个消费者

有时可能生产者比消费者快,反之也是一样
所以生产者和消费者为了弥补之间速度的差异化,加了一个中间的缓冲队列.

生产者和消费者模型从程序上看就是一个存放和拿取的过程.
最为理想的生产者消费者模型是两者之间的运行速度相对同步.

#例:

from multiprocessing import Process, Queue
import random, time # 消费者模型[负责取值]
def consumer(q, name):
while True:
food = q.get()
if food is None:
break
time.sleep(random.uniform(0.1, 1))
print("1.%s吃了一个%s" % (name, food))
# 生产者模型 [负责存之值]
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
print("2. %s 生产了 %s %s" % (name, food, i))
q.put(food+str(i)) if __name__ == "__main__":
q = Queue()
# 创建消费者进程对象
c1 = Process(target=consumer, args=(q,"one"))
# 如果设置守护进程,主进程直接当前消费者模型结束,不能够保证所有数据消费完毕,是一个弊端,而且不理想
c1.start() # 2号消费者
c2 = Process(target=consumer, args=(q,"two"))
c2.start() # 创建生产进程对象
p1 = Process(target=producer, args=(q,"producer_one","糖"))
p1.start() p2 = Process(target=producer, args=(q,"producer_two","巧克力"))
p2.start() p1.join()
p2.join()
q.put(None)
q.put(None)

运行后的结果如下:

2. producer_one 生产了 糖 0

2. producer_two 生产了 巧克力 0

1.one吃了一个巧克力0

1.two吃了一个糖0

2. producer_one 生产了 糖 1

2. producer_two 生产了 巧克力 1

1.one吃了一个糖1

1.two吃了一个巧克力1

2. producer_one 生产了 糖 2

2. producer_two 生产了 巧克力 2

1.one吃了一个糖2

2. producer_one 生产了 糖 3

1.two吃了一个巧克力2

2. producer_two 生产了 巧克力 3

1.one吃了一个糖3

2. producer_two 生产了 巧克力 4

2. producer_one 生产了 糖 4

1.two吃了一个巧克力3

1.one吃了一个巧克力4

1.two吃了一个糖4

结果分析:生产的糖或巧克力一定或被消费完,等消费完主进程才会结束

十一:JoinableQueue

1. 概述

JoinableQueue 与Queue一样也是multiprocessing模块中的一个类,也可以用于创建进程队列。

JoinableQueue 创建可连接的共享进程队列,队列允许队列的消费者通知生产者,队列数据已被成功处理完成。通知过程是使用共享的信号和条件变量来实现的。

2.常用方法与属性

(1)put 存入
(2)get 获取

JoinableQueue除了与Queue相同的方法之外,还具有2个特有的方法

(3)q.task_done()

消费者使用此方法发出信号,表示q.get()返回的项目已经被处理完成。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常。

每次get一次数据,就执行一次task_done(),可以让中间变量的值减1

(4)q.join()

 判断如果队列里面还有值,默认是要加阻塞的。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止。

task_done  join 通过一个中间变量统计列表元素个数
每放入一个值,队列元素个数加1
通过task_done让当前队列的元素数量减1
最后join查找统计队列的这个变量,如果是0,才不会添加阻塞,放行

 

#(1)基本用法

from multiprocessing import Process,JoinableQueue
jq = JoinableQueue()
jq.put(11)
print(jq.get())
jq.task_done() #如果没有加这一行代码,后面的代码不执行
jq.join()
print("finish")

#打印结果:
11
finish

下面对task_done 和 join进行举例理解:

#例:

# 优化生产者消费者模型
# 消费者模型 [负责取值]
def consumer(q,name):
while True:
food = q.get()
time.sleep(random.uniform(0.1,1)) #随机生成一个0.1到之间的小数
print("1. %s 吃了一个%s " %(name,food))
q.task_done() # 生产者模型 [负责存值]
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.1,1))
print("2. %s 生产了 %s %s" % (name,food,i))
q.put(food+str(i)) if __name__ == "__main__":
jq = JoinableQueue()
# 创建消费者
c1 = Process(target=consumer,args=(jq,"one"))
c1.daemon = True
c1.start() # 创建生产者
p1 = Process(target=producer, args=(jq,"two","meat"))
p1.start() p1.join()
# 必须等待队列里面的锁头数据都清空,判断值为0才放行
jq.join()
print("全部结束")

#打印结果为:

2. two 生产了 meat 0

2. two 生产了 meat 1

1. one 吃了一个meat0

1. one 吃了一个meat1

2. two 生产了 meat 2

1. one 吃了一个meat2

2. two 生产了 meat 3

1. one 吃了一个meat3

2. two 生产了 meat 4

1. one 吃了一个meat4

全部结束

结果分析:生产了才能进行消费,join添加了阻塞,只有消费完成程序才运行结束,而c1加了守护进程