这个Python生产者 - 消费者无锁方法是否是线程安全的?

时间:2022-12-17 11:47:53

I recently wrote a program that used a simple producer/consumer pattern. It initially had a bug related to improper use of threading.Lock that I eventually fixed. But it made me think whether it's possible to implement producer/consumer pattern in a lockless manner.

我最近编写了一个使用简单生产者/消费者模式的程序。它最初有一个与不正确使用线程有关的错误。我最终解决了这个问题。但它让我想到是否有可能以无锁的方式实现生产者/消费者模式。

Requirements in my case were simple:

我的要求很简单:

  • One producer thread.
  • 一个生产者线程。

  • One consumer thread.
  • 一个消费者线程。

  • Queue has place for only one item.
  • 队列只有一个项目。

  • Producer can produce next item before the current one is consumed. The current item is therefore lost, but that's OK for me.
  • 生产者可以在消耗当前物品之前生成下一个物品。因此当前项目丢失了,但这对我来说没问题。

  • Consumer can consume current item before the next one is produced. The current item is therefore consumed twice (or more), but that's OK for me.
  • 消费者可以在生成下一个项目之前使用当前项目。因此,当前项目被消耗两次(或更多),但这对我来说没问题。

So I wrote this:

所以我写了这个:

QUEUE_ITEM = None

# this is executed in one threading.Thread object
def producer():
    global QUEUE_ITEM
    while True:
        i = produce_item()
        QUEUE_ITEM = i

# this is executed in another threading.Thread object
def consumer():
    global QUEUE_ITEM
    while True:
        i = QUEUE_ITEM
        consume_item(i)

My question is: Is this code thread-safe?

我的问题是:这个代码是否是线程安全的?

Immediate comment: this code isn't really lockless - I use CPython and it has GIL.

直接评论:这段代码不是真的无锁 - 我使用CPython并且它有GIL。

I tested the code a little and it seems to work. It translates to some LOAD and STORE ops which are atomic because of GIL. But I also know that del x operation isn't atomic when x implements __del__ method. So if my item has a __del__ method and some nasty scheduling happens, things may break. Or not?

我测试了一点代码,它似乎工作。它转换为一些由于GIL而成为原子的LOAD和STORE操作。但我也知道,当x实现__del__方法时,del x操作不是原子的。因此,如果我的项目有__del__方法并且发生了一些讨厌的调度,那么事情可能会中断。或不?

Another question is: What kind of restrictions (for example on produced items' type) do I have to impose to make the above code work fine?

另一个问题是:为了使上述代码正常工作,我必须强加什么样的限制(例如生成的项目类型)?

My questions are only about theoretical possibility to exploit CPython's and GIL's quirks in order to come up with lockless (i.e. no locks like threading.Lock explicitly in code) solution.

我的问题只是关于利用CPython和GIL的怪癖的理论可能性,以便提出无锁(即没有锁代码,如代码中明确的线程。锁)解决方案。

6 个解决方案

#1


Yes this will work in the way that you described:

是的,这将按照您描述的方式工作:

  1. That the producer may produce a skippable element.
  2. 生产者可以生产可跳过的元素。

  3. That the consumer may consume the same element.
  4. 消费者可能消费相同的元素。

But I also know that del x operation isn't atomic when x implements del method. So if my item has a del method and some nasty scheduling happens, things may break.

但我也知道,当x实现del方法时,del x操作不是原子的。因此,如果我的项目有del方法并且发生了一些令人讨厌的调度,那么事情可能会中断。

I don't see a "del" here. If a del happens in consume_item then the del may occur in the producer thread. I don't think this would be a "problem".

我在这里看不到“del”。如果在consume_item中发生del,则del可能出现在生成器线程中。我不认为这会是一个“问题”。

Don't bother using this though. You will end up using up CPU on pointless polling cycles, and it is not any faster than using a queue with locks since Python already has a global lock.

尽管不要打扰使用它。您将最终在无意义的轮询周期中耗尽CPU,并且它不会比使用带锁的队列更快,因为Python已经具有全局锁定。

#2


Trickery will bite you. Just use Queue to communicate between threads.

诡计会咬你。只需使用Queue在线程之间进行通信。

#3


This is not really thread safe because producer could overwrite QUEUE_ITEM before consumer has consumed it and consumer could consume QUEUE_ITEM twice. As you mentioned, you're OK with that but most people aren't.

这不是真正的线程安全,因为生产者可能会在消费者消耗它之前覆盖QUEUE_ITEM,而消费者可能会消耗两次QUEUE_ITEM。正如你所提到的,你没关系,但大多数人都没有。

Someone with more knowledge of cpython internals will have to answer you more theoretical questions.

对cpython内部知识有更多了解的人将不得不回答你更多的理论问题。

#4


I think it's possible that a thread is interrupted while producing/consuming, especially if the items are big objects. Edit: this is just a wild guess. I'm no expert.

我认为在生产/消费时线程可能会被中断,特别是如果项目是大对象。编辑:这只是一个疯狂的猜测。我不是专家。

Also the threads may produce/consume any number of items before the other one starts running.

线程也可以在另一个项开始运行之前产生/消耗任意数量的项。

#5


You can use a list as the queue as long as you stick to append/pop since both are atomic.

只要您坚持追加/弹出,就可以使用列表作为队列,因为两者都是原子的。

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

In a class scope like below, you can even clear the queue.

在如下所示的类范围中,您甚至可以清除队列。

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

You'll have to find out which list operations are atomic by looking at the bytecode generated.

您必须通过查看生成的字节码来找出哪些列表操作是原子的。

#6


The __del__ could be a problem as You said. It could be avoided, if only there was a way to prevent the garbage collector from invoking the __del__ method on the old object before We finish assigning the new one to the QUEUE_ITEM. We would need something like:

你说__del__可能是个问题。如果只有一种方法可以防止垃圾收集器在完成将新的方法分配给QUEUE_ITEM之前调用旧对象上的__del__方法,则可以避免这种情况。我们需要这样的东西:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

I'm afraid, I don't know if it is possible, though.

我担心,我不知道是否有可能。

#1


Yes this will work in the way that you described:

是的,这将按照您描述的方式工作:

  1. That the producer may produce a skippable element.
  2. 生产者可以生产可跳过的元素。

  3. That the consumer may consume the same element.
  4. 消费者可能消费相同的元素。

But I also know that del x operation isn't atomic when x implements del method. So if my item has a del method and some nasty scheduling happens, things may break.

但我也知道,当x实现del方法时,del x操作不是原子的。因此,如果我的项目有del方法并且发生了一些令人讨厌的调度,那么事情可能会中断。

I don't see a "del" here. If a del happens in consume_item then the del may occur in the producer thread. I don't think this would be a "problem".

我在这里看不到“del”。如果在consume_item中发生del,则del可能出现在生成器线程中。我不认为这会是一个“问题”。

Don't bother using this though. You will end up using up CPU on pointless polling cycles, and it is not any faster than using a queue with locks since Python already has a global lock.

尽管不要打扰使用它。您将最终在无意义的轮询周期中耗尽CPU,并且它不会比使用带锁的队列更快,因为Python已经具有全局锁定。

#2


Trickery will bite you. Just use Queue to communicate between threads.

诡计会咬你。只需使用Queue在线程之间进行通信。

#3


This is not really thread safe because producer could overwrite QUEUE_ITEM before consumer has consumed it and consumer could consume QUEUE_ITEM twice. As you mentioned, you're OK with that but most people aren't.

这不是真正的线程安全,因为生产者可能会在消费者消耗它之前覆盖QUEUE_ITEM,而消费者可能会消耗两次QUEUE_ITEM。正如你所提到的,你没关系,但大多数人都没有。

Someone with more knowledge of cpython internals will have to answer you more theoretical questions.

对cpython内部知识有更多了解的人将不得不回答你更多的理论问题。

#4


I think it's possible that a thread is interrupted while producing/consuming, especially if the items are big objects. Edit: this is just a wild guess. I'm no expert.

我认为在生产/消费时线程可能会被中断,特别是如果项目是大对象。编辑:这只是一个疯狂的猜测。我不是专家。

Also the threads may produce/consume any number of items before the other one starts running.

线程也可以在另一个项开始运行之前产生/消耗任意数量的项。

#5


You can use a list as the queue as long as you stick to append/pop since both are atomic.

只要您坚持追加/弹出,就可以使用列表作为队列,因为两者都是原子的。

QUEUE = []

# this is executed in one threading.Thread object
def producer():
    global QUEUE
    while True:
        i = produce_item()
        QUEUE.append(i)

# this is executed in another threading.Thread object
def consumer():
    global QUEUE
    while True:
        try:
            i = QUEUE.pop(0)
        except IndexError:
            # queue is empty
            continue

        consume_item(i)

In a class scope like below, you can even clear the queue.

在如下所示的类范围中,您甚至可以清除队列。

class Atomic(object):
    def __init__(self):
        self.queue = []

    # this is executed in one threading.Thread object
    def producer(self):
        while True:
            i = produce_item()
            self.queue.append(i)

    # this is executed in another threading.Thread object
    def consumer(self):
        while True:
            try:
                i = self.queue.pop(0)
            except IndexError:
                # queue is empty
                continue

            consume_item(i)

    # There's the possibility producer is still working on it's current item.
    def clear_queue(self):
        self.queue = []

You'll have to find out which list operations are atomic by looking at the bytecode generated.

您必须通过查看生成的字节码来找出哪些列表操作是原子的。

#6


The __del__ could be a problem as You said. It could be avoided, if only there was a way to prevent the garbage collector from invoking the __del__ method on the old object before We finish assigning the new one to the QUEUE_ITEM. We would need something like:

你说__del__可能是个问题。如果只有一种方法可以防止垃圾收集器在完成将新的方法分配给QUEUE_ITEM之前调用旧对象上的__del__方法,则可以避免这种情况。我们需要这样的东西:

increase the reference counter on the old object
assign a new one to `QUEUE_ITEM`
decrease the reference counter on the old object

I'm afraid, I don't know if it is possible, though.

我担心,我不知道是否有可能。