[原创]如何编写多个阻塞队列连接下的多生产者多消费者的Python程序

时间:2024-01-15 15:35:38

平常在写程序时,往往会遇到一个需求:在程序的多个阶段都会出现阻塞的可能,因此,这多个阶段就需要并发执行。

Python的多线程有一个特点,就是不允许从外部结束一个运行中的线程,这给我们编写代码时带来了一定的困难。网上现存的多篇文章,往往都从单个阻塞队列出发来讲多线程,大家都知道然而这并没有什么卵用。本文将从一个需求出发,详细分析如何编写一个没有并发问题、可以保证在正确的时间内结束的多生产者、多消费者模式下的Python程序。

言归正传,首先是需求:

我有一个已知的ip段,比如64.233.16.0/20,我需要知道这里面有几个有效的能够提供443连接的ip地址,若能够提供有效443连接,那么测算可达ip的速度。

大概框架可以如此设计:

ip段---> Ip生成器(一个线程)===(阻塞队列a)===连接测试器(多个线程)===(阻塞队列b)===速度测试器(两个线程)

在明确了需要实现的框架后,我们再来看看手头都有什么弹药。

1、守护线程。什么是守护线程?我们可以通过守护线程的行为来定义它。对于一个Python进程而言,在该进程所属的所有非守护线程结束后,如果还存在守护线程在运行,那么这些守护线程就会直接被从外部终结。

2、阻塞队列。什么是阻塞队列?阻塞队列就是库提供给我们可以在多线程环境下安全运行的容器,一般都用来实现生产者消费者模型。阻塞队列一般提供两个方法,一个是put,往队列中投放元素,若队列容量满则put的调用线程阻塞;另一个是get,往队列中取元素,若队列中无元素则get的调用线程阻塞。

对于Python的阻塞队列实现Queue而言,还有一队非常重要的方法task_done()和join()。下面我直接抄官方文档了:

Queue.task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

New in version 2.5.

Queue.join()

Blocks until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

New in version 2.5.

可以看出,在面对生产者消费者模型时,我们先将生产者线程和消费者线程设置为守护线程,然后在主线程中用一个阻塞队列将生产者线程和消费者线程连接起来,再分别启动生产者线程和消费者线程,在消费者线程中,每消费队列中的一个元素并处理后都调用一次queue.task_done(),然后别忘了在主线程调用queue.join(),似乎就可以完美解决这个问题了。

但是,这有一个非常严重的问题:

如果生产者速度比消费者慢,那会发生什么?

由之前的描述看,如果队列中的元素被提前消耗完了,主线程就会从queue.task_done()中唤醒,那么在生产者还在生产新的元素时,就会因为主线程的退出而退出,这个时候,任务还没有结束。

而在我们这个需求中,虽然ip生成器的速度很快,不会出现这个问题,但连接测试器生产的速度远远不如速度测试器消费的速度快,立刻就会出现上述的情况,任务明明还没有结束,整个程序却得退出了。

解决方案:

1、理论

先来一个断言:在生产者生产结束后,主线程对该阻塞队列调用join方法,可以正确结束程序。

接下来是证明:不妨假设在生产者生产全部结束后(也就是待处理元素都在queue中了),主线程对阻塞队列调用join方法并没有正确结束程序。join方法取消阻塞状态的前提是队列中无待处理元素,由于没有正确结束程序,因此,此时生产者应该还在生产,但这和前提矛盾,故而得证。

2、实现

实际上,我们可以看出,要想正确结束一个多阻塞队列连接的多生产者多消费者的多线程程序,关键在于亮点:

1)主线程要知道第一组生产者何时结束。

2)在第一组生产者结束后,主线程按照阻塞队列的连接顺序对其分别调用join方法。

为什么这么做是对的?我们知道,保证被一个阻塞队列连接起来的多个生产者消费者正确结束,需要保证在对阻塞队列调用join方法以前生产者已经完成任务了。在第一组生产者结束后,此时对于第二组生产者,也就是第一组消费者而言,只需要拿完queue中残留的元素即可。

对于第二组生产者而言,我们可以这么写:

while True:
item = queue1.get()
process(item)
queue2.put(item)
queue1.task_done()

可以看出,在queue1的join方法解除阻塞状态时,此时第二组生产者也已经完成了生产,因为第二组生产者将元素放入下一个阻塞队列在对前一个阻塞队列调用task_done之前(有点拗口。。),故而维持了这一性质。

3、最后的问题:主线程如何得知第一组生产者何时结束?

由于第一组生产者直接和数据源打交道,因此,第一组生产者可以明白自己何时应该结束。具体到我们这个需求,由于给定了ip段,而IP段中的地址是有限的,故而ip生成器完全不需要用while True来保证线程的持续运行,而只需要这么写:

for ip in ip_list:
queue1.put(ip)

在明确了上面这点后,我们可以通过一个比较讨巧和优雅的办法来避免使用复杂而容易出错的notify机制,而让主线程轻易得知第一组生产者线程的结束时机!

那就是!

用一个阻塞队列将主线程和第一组生产者连接起来!

先看代码如何写:

# 主线程:
for thread in first_producer:
thread.set_notify_queue(queue_notify)
thread.start() for key in range(len(first_producer)):
queue_notify.put('any thing u like, i chose string') queue_notify.join() # 第一组生产者线程(可能有多个) queue_notify.get()
for ip in ip_list:
queue1.put(ip)
what_ever_other_thing()
queue_notify.task_done()

假设第一组生产者线程有20个

那么,连接第一组生产者线程和主线程的阻塞队列中,就有20个你喜欢的任意对象。由于每个生产者线程只拿一次,因此,可以保证人人有份。而在第一组生产者线程结束时,会调用一次task_done,因此,在所有第一组生产者线程结束后,主线程就能从queue_notify.join()中被唤醒,进而执行接下来的逻辑。

接下来什么逻辑?

前面说过了,按顺序挨个join你的阻塞队列们。

如此这般,我们就能完美而优雅的解决正确滴结束多个阻塞队列连接下的多生产者多消费者的Python程序这个问题(感觉像绕口令。。)