在什么情况下会阻塞队列。抛出打断异常?

时间:2022-12-09 07:09:19

Let us suppose that I have a thread that consumes items produced by another thread. Its run method is as follows, with inQueue being a BlockingQueue

假设我有一个线程,它使用另一个线程生成的项。它的运行方法如下,inQueue是一个BlockingQueue

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        w.consume();
    } catch (InterruptedException e) { 
        shutdown = true;
    }
}

Furthermore, a different thread will signal that there are no more work items by interrupting this running thread. Will take() throw an interrupted exception if it does not need to block to retrieve the next work item. i.e. if the producer signals that it is done filling the work queue, is it possible to accidentally leave some items in inQueue or miss the interrupt?

此外,另一个线程将通过中断运行的线程来通知没有其他工作项。如果不需要阻塞以检索下一个工作项,则将抛出一个中断异常。例如,如果生产者发出信号说它已经填满了工作队列,是否有可能不小心将一些项目留在队列中或错过中断?

4 个解决方案

#1


4  

A good way to signal termination of a blocking queue is to submit a 'poison' value into the queue that indicates a shutdown has occurred. This ensures that the expected behavior of the queue is honored. Calling Thread.interupt() is probably not a good idea if you care about clearing the queue.

表示阻塞队列终止的一个好方法是向队列提交一个“毒药”值,该值指示关闭已经发生。这确保队列的预期行为得到遵守。如果您关心清除队列,那么调用Thread.interupt()可能不是一个好主意。

To provide some code:

提供一些代码:

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}

#2


3  

According to javadoc, the take() method will throw InterruptedException if interrupted while waiting.

根据javadoc, take()方法将在等待期间中断时抛出InterruptedException。

#3


2  

I wondered about the same thing and reading the javadoc for take() I believed that it would throw an interrupted exception only after having taken all the items in the queue, since if the queue had items, it would not have to "wait". But I made a small test:

我对同样的事情感到疑惑,并阅读了javadoc for take(),我认为只有在获取了队列中的所有项之后,它才会抛出一个中断的异常,因为如果队列有项,那么它就不必“等待”。但我做了一个小测试:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}

The runner will take 10-11 items and then finish i.e. take() will throw InterruptedException even if there still is items in the queue.

运行器将获取10-11个项目,然后完成,例如take()将抛出InterruptedException,即使队列中仍然有项目。

Summary: Use the Poison pill approach instead, then you have full control over how much is left in the queue.

总结:使用毒丸方法代替,然后你可以完全控制在队列中剩下多少。

#4


-1  

The java.concurrency.utils package was designed and implemented by some of the finest minds in concurrent programming. Also, interrupting threads as a means to terminate them is explicitly endorsed by their book "Java Concurrency in Practice". Therefore, I would be extremely surprised if any items were left in the queue due to an interrupt.

java.concurrency。utils包是由并发编程中一些最优秀的人设计和实现的。此外,中断线程作为终止它们的一种手段也得到了它们的《实践中的Java并发》一书的明确支持。因此,如果由于中断而将任何项目留在队列中,我会感到非常惊讶。

#1


4  

A good way to signal termination of a blocking queue is to submit a 'poison' value into the queue that indicates a shutdown has occurred. This ensures that the expected behavior of the queue is honored. Calling Thread.interupt() is probably not a good idea if you care about clearing the queue.

表示阻塞队列终止的一个好方法是向队列提交一个“毒药”值,该值指示关闭已经发生。这确保队列的预期行为得到遵守。如果您关心清除队列,那么调用Thread.interupt()可能不是一个好主意。

To provide some code:

提供一些代码:

boolean shutdown = false;
while (!shutdown) {
    try {
        WorkItem w = inQueue.take();
        if (w == QUEUE_IS_DEAD)
          shutdown = true;
        else
          w.consume();
    } catch (InterruptedException e) { 
        // possibly submit QUEUE_IS_DEAD to the queue
    }
}

#2


3  

According to javadoc, the take() method will throw InterruptedException if interrupted while waiting.

根据javadoc, take()方法将在等待期间中断时抛出InterruptedException。

#3


2  

I wondered about the same thing and reading the javadoc for take() I believed that it would throw an interrupted exception only after having taken all the items in the queue, since if the queue had items, it would not have to "wait". But I made a small test:

我对同样的事情感到疑惑,并阅读了javadoc for take(),我认为只有在获取了队列中的所有项之后,它才会抛出一个中断的异常,因为如果队列有项,那么它就不必“等待”。但我做了一个小测试:

package se.fkykko.slask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class BlockingQueueTakeTest {

public static void main(String[] args) throws Exception {
    Runner t = new Runner();
    Thread t1 = new Thread(t);
    for (int i = 0; i < 50; i++) {
        t.queue.add(i);
    }
    System.out.println(("Number of items in queue: " + t.queue.size()));
    t1.start();
    Thread.sleep(1000);
    t1.interrupt();
    t1.join();
    System.out.println(("Number of items in queue: " + t.queue.size()));
    System.out.println(("Joined t1. Finished"));

}

private static final class Runner implements Runnable {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(100);
    AtomicLong m_count = new AtomicLong(0);

    @Override
    public void run() {
        try {
            while (true) {
                queue.take();
                System.out.println("Took item " + m_count.incrementAndGet());
                final long start = System.currentTimeMillis();
                while ((System.currentTimeMillis() - start) < 100) {
                    Thread.yield(); //Spin wait
                }
            }
        }
        catch (InterruptedException ex) {
            System.out.println("Interrupted. Count: " + m_count.get());
        }
    }
}

}

The runner will take 10-11 items and then finish i.e. take() will throw InterruptedException even if there still is items in the queue.

运行器将获取10-11个项目,然后完成,例如take()将抛出InterruptedException,即使队列中仍然有项目。

Summary: Use the Poison pill approach instead, then you have full control over how much is left in the queue.

总结:使用毒丸方法代替,然后你可以完全控制在队列中剩下多少。

#4


-1  

The java.concurrency.utils package was designed and implemented by some of the finest minds in concurrent programming. Also, interrupting threads as a means to terminate them is explicitly endorsed by their book "Java Concurrency in Practice". Therefore, I would be extremely surprised if any items were left in the queue due to an interrupt.

java.concurrency。utils包是由并发编程中一些最优秀的人设计和实现的。此外,中断线程作为终止它们的一种手段也得到了它们的《实践中的Java并发》一书的明确支持。因此,如果由于中断而将任何项目留在队列中,我会感到非常惊讶。