Python等待队列和事件

时间:2022-11-10 00:01:33

I have a queue and an event. I would like to exit the loop when the event is set to True, however there is a queue.get() in the loop which blocks until there is something in it.

我有一个队列和一个事件。我想在事件设置为True时退出循环,但是循环中有一个queue.get()阻塞,直到有东西存在。

How can I abort the waiting of the self._commandQueue.get() when the closeEvent Event flag is set?

当设置closeEvent Event标志时,如何中止self._commandQueue.get()的等待?

Note: I want to avoid depending on the blocking nature of the queue and want to block based on the condition of the queue and the eventflag

注意:我想避免依赖于队列的阻塞性质,并希望根据队列的条件和事件标志进行阻止

def _execute(self):
    while not self._closeEvent.isSet():
        nextCommand = self._commandQueue.get()
        self._commandExecutor.execute(nextCommand)
        self._commandQueue.task_done()

4 个解决方案

#1


3  

You would need something like the Windows WaitForMultipleObjects() call, but the python event and queue API don't offer such a beast (but you could use win32api to use that if you are strictly windows), so if you really need BOTH event sources to be checked in parallel, the answer is 'you cannot without polling (or monkey patching the Event class to allow it)'.

你需要类似Windows WaitForMultipleObjects()调用的东西,但是python事件和队列API不提供这样的野兽(但如果你是严格的windows,你可以使用win32api来使用它),所以如果你真的需要两个事件来源要并行检查,答案是“你不能没有轮询(或猴子修补事件类允许它)”。

But if you are a bit more flexible, you can arrange something like it, by redefining your command queue a bit. If the command queue is a PriorityQueue, you could queue your normal jobs with normal priority and have an extra process queue a 'STOP' token with higher priority, once your event signals.

但是如果你更灵活,你可以通过重新定义命令队列来安排类似的东西。如果命令队列是PriorityQueue,则可以使用正常优先级对正常作业进行排队,并在事件发出信号后为额外的进程队列设置具有更高优先级的“STOP”令牌。

STOP = None

def _execute(self):
    while 1:
        nextCommand = self._commandQueue.get()[1]
        if nextCommand is STOP:
           break
        self._commandExecutor.execute(nextCommand)
        self._commandQueue.task_done()

def wait_for_stop_signal(self):
    self._closeEvent.wait()
    self._commandQueue.put((-1, STOP))

Now you run wait_for_stop_signal in its own thread, and you have the behaviour you want (but waste one thread instead of polling, pick whats worse for your use case).

现在你在自己的线程中运行wait_for_stop_signal,并且你有你想要的行为(但是浪费一个线程而不是轮询,为你的用例选择更糟糕的事情)。

#2


3  

Queue uses events internally according to http://hg.python.org/cpython/file/3a1db0d2747e/Lib/Queue.py#l150 Specifically it uses self.not_empty event which your application could also wait on before attempting a Queue.get_nowait call.

队列根据http://hg.python.org/cpython/file/3a1db0d2747e/Lib/Queue.py#l150在内部使用事件。具体来说,它使用self.not_empty事件,您的应用程序也可以在尝试Queue.get_nowait调用之前等待。

As for waiting on several events there's Python threading: can I sleep on two threading.Event()s simultaneously? question with some code examples.

至于等待几个事件的Python线程:我可以同时睡两个threading.Event()吗?一些代码示例的问题。

#3


1  

I offer these two alternatives, which may suit:

我提供这两种选择,可能适合:

Solution 1

Put a "magic quit marker" into your queue. I will use None, you can use a special Quit message if desired. This is more or less the same as using priority queue method, but (a) it is a bit simpler, (b) BUT the queue will have to empty up to the marker point, which may or may not be acceptable for you.

将“魔术退出标记”放入队列中。我将使用None,如果需要,您可以使用特殊的Quit消息。这与使用优先级队列方法大致相同,但是(a)它稍微简单一点,(b)但是队列必须清空标记点,这可能是您可能接受的,也可能是不可接受的。

For example:

例如:

# Untested
# Processing thread
def run(self):
    while True:
        msg = self.main_queue.get()
        if msg == None:
            break
        ...do..queue..work...

def shutdown():
    self.main_queue.put(None)

Solution 2

Instead of calling Queue.get straight away. Create an event object that is signalled when either: * Something is put in the queue. * You want to shutdown

而不是直接调用Queue.get。创建一个事件对象,该对象在以下情况之间发出信号:*将某些内容放入队列中。 *你想关机

Instead of exposing the queue to the outside world, define a method to add to queue:

而不是将队列暴露给外部世界,定义一个添加到队列的方法:

def add_to_queue(self, msg):
    self.main_queue.put(msg)
    self.my_event.set() # Signal the consumer

def shutdown(self, msg):
    self.quit_event.set() # Set an event / condvar / msg to indicate exit
    self.my_event.set() # Signal something has happened.




def run(self)
    while True:
        self.my_event.wait()
        if self.quit_event.is_set()
            break
        msg = self.main_queue.get(False) # Empty exception should not happen here

You don't necessarily need a quit event as you can assume an empty queue and a signalled my_event means it's time to quit. Although, I think it's better to explicitly use a proper indicator - message, event, condition var, etc.

您不一定需要退出事件,因为您可以假设一个空队列,并且发出信号my_event意味着它是时候退出。虽然,我认为最好明确使用适当的指标 - 消息,事件,条件变量等。

#4


0  

Queue.get is a blocking method when it's called without parameters.

Queue.get是一个在没有参数的情况下调用的阻塞方法。

From docs:

来自docs:

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

将项目放入队列。如果可选的args块为true且timeout为None(默认值),则在必要时阻塞,直到有空闲插槽可用。如果timeout是一个正数,则它会阻止最多超时秒,如果在该时间内没有可用的空闲槽,则会引发Full异常。否则(块为假),如果空闲插槽立即可用,则将项目放在队列中,否则引发完全异常(在这种情况下忽略超时)。

You need to do something like that:

你需要做那样的事情:

try:
    # If `False`, the program is not blocked, it will throw the Queue.Empty exception.
    my_data = queue.get(False)  
    .....Some Code......
except Queue.Empty:
    my_data = None # or whatever you want

More options

更多的选择

  1. get_nowait:

    get_nowait:

    Queue.get_nowait()

    Queue.get_nowait()

    which is equivalent to get(False).

    这相当于get(False)。

  2. Using timeout:

    使用超时:

    my_data = queue.get(True, 5)

    my_data = queue.get(True,5)

    This will try to get for 5 secends if the get will fail (nothing to get) it will raise the same exception Queue.Empty

    这将尝试获得5个sece如果get将失败(没有得到)它将引发相同的异常Queue.Empty

#1


3  

You would need something like the Windows WaitForMultipleObjects() call, but the python event and queue API don't offer such a beast (but you could use win32api to use that if you are strictly windows), so if you really need BOTH event sources to be checked in parallel, the answer is 'you cannot without polling (or monkey patching the Event class to allow it)'.

你需要类似Windows WaitForMultipleObjects()调用的东西,但是python事件和队列API不提供这样的野兽(但如果你是严格的windows,你可以使用win32api来使用它),所以如果你真的需要两个事件来源要并行检查,答案是“你不能没有轮询(或猴子修补事件类允许它)”。

But if you are a bit more flexible, you can arrange something like it, by redefining your command queue a bit. If the command queue is a PriorityQueue, you could queue your normal jobs with normal priority and have an extra process queue a 'STOP' token with higher priority, once your event signals.

但是如果你更灵活,你可以通过重新定义命令队列来安排类似的东西。如果命令队列是PriorityQueue,则可以使用正常优先级对正常作业进行排队,并在事件发出信号后为额外的进程队列设置具有更高优先级的“STOP”令牌。

STOP = None

def _execute(self):
    while 1:
        nextCommand = self._commandQueue.get()[1]
        if nextCommand is STOP:
           break
        self._commandExecutor.execute(nextCommand)
        self._commandQueue.task_done()

def wait_for_stop_signal(self):
    self._closeEvent.wait()
    self._commandQueue.put((-1, STOP))

Now you run wait_for_stop_signal in its own thread, and you have the behaviour you want (but waste one thread instead of polling, pick whats worse for your use case).

现在你在自己的线程中运行wait_for_stop_signal,并且你有你想要的行为(但是浪费一个线程而不是轮询,为你的用例选择更糟糕的事情)。

#2


3  

Queue uses events internally according to http://hg.python.org/cpython/file/3a1db0d2747e/Lib/Queue.py#l150 Specifically it uses self.not_empty event which your application could also wait on before attempting a Queue.get_nowait call.

队列根据http://hg.python.org/cpython/file/3a1db0d2747e/Lib/Queue.py#l150在内部使用事件。具体来说,它使用self.not_empty事件,您的应用程序也可以在尝试Queue.get_nowait调用之前等待。

As for waiting on several events there's Python threading: can I sleep on two threading.Event()s simultaneously? question with some code examples.

至于等待几个事件的Python线程:我可以同时睡两个threading.Event()吗?一些代码示例的问题。

#3


1  

I offer these two alternatives, which may suit:

我提供这两种选择,可能适合:

Solution 1

Put a "magic quit marker" into your queue. I will use None, you can use a special Quit message if desired. This is more or less the same as using priority queue method, but (a) it is a bit simpler, (b) BUT the queue will have to empty up to the marker point, which may or may not be acceptable for you.

将“魔术退出标记”放入队列中。我将使用None,如果需要,您可以使用特殊的Quit消息。这与使用优先级队列方法大致相同,但是(a)它稍微简单一点,(b)但是队列必须清空标记点,这可能是您可能接受的,也可能是不可接受的。

For example:

例如:

# Untested
# Processing thread
def run(self):
    while True:
        msg = self.main_queue.get()
        if msg == None:
            break
        ...do..queue..work...

def shutdown():
    self.main_queue.put(None)

Solution 2

Instead of calling Queue.get straight away. Create an event object that is signalled when either: * Something is put in the queue. * You want to shutdown

而不是直接调用Queue.get。创建一个事件对象,该对象在以下情况之间发出信号:*将某些内容放入队列中。 *你想关机

Instead of exposing the queue to the outside world, define a method to add to queue:

而不是将队列暴露给外部世界,定义一个添加到队列的方法:

def add_to_queue(self, msg):
    self.main_queue.put(msg)
    self.my_event.set() # Signal the consumer

def shutdown(self, msg):
    self.quit_event.set() # Set an event / condvar / msg to indicate exit
    self.my_event.set() # Signal something has happened.




def run(self)
    while True:
        self.my_event.wait()
        if self.quit_event.is_set()
            break
        msg = self.main_queue.get(False) # Empty exception should not happen here

You don't necessarily need a quit event as you can assume an empty queue and a signalled my_event means it's time to quit. Although, I think it's better to explicitly use a proper indicator - message, event, condition var, etc.

您不一定需要退出事件,因为您可以假设一个空队列,并且发出信号my_event意味着它是时候退出。虽然,我认为最好明确使用适当的指标 - 消息,事件,条件变量等。

#4


0  

Queue.get is a blocking method when it's called without parameters.

Queue.get是一个在没有参数的情况下调用的阻塞方法。

From docs:

来自docs:

Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the Full exception if no free slot was available within that time. Otherwise (block is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (timeout is ignored in that case).

将项目放入队列。如果可选的args块为true且timeout为None(默认值),则在必要时阻塞,直到有空闲插槽可用。如果timeout是一个正数,则它会阻止最多超时秒,如果在该时间内没有可用的空闲槽,则会引发Full异常。否则(块为假),如果空闲插槽立即可用,则将项目放在队列中,否则引发完全异常(在这种情况下忽略超时)。

You need to do something like that:

你需要做那样的事情:

try:
    # If `False`, the program is not blocked, it will throw the Queue.Empty exception.
    my_data = queue.get(False)  
    .....Some Code......
except Queue.Empty:
    my_data = None # or whatever you want

More options

更多的选择

  1. get_nowait:

    get_nowait:

    Queue.get_nowait()

    Queue.get_nowait()

    which is equivalent to get(False).

    这相当于get(False)。

  2. Using timeout:

    使用超时:

    my_data = queue.get(True, 5)

    my_data = queue.get(True,5)

    This will try to get for 5 secends if the get will fail (nothing to get) it will raise the same exception Queue.Empty

    这将尝试获得5个sece如果get将失败(没有得到)它将引发相同的异常Queue.Empty