《java.util.concurrent 包源码阅读》05 BlockingQueue

时间:2023-03-08 19:58:45

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素。相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列。BlockingQueue就是这么一个生产者-消费者队列。

BlockingQueue是Queue的子接口

public interface BlockingQueue<E> extends Queue<E>

BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

一种是一直等待直到队列不为空,这种情况调用take方法

E take() throws InterruptedException;

另一种就是设定一个超时时间,一直等到超时,这种情况调用的是pool方法

E poll(long timeout, TimeUnit unit) throws InterruptedException;

同样对于添加元素来说,也有两种情况:

一直等待使用put方法

void put(E e) throws InterruptedException;

超时等待使用offer方法

boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

BlockingQueue的父接口Queue关于拿走元素的接口有两个:remove和pool。

两者的区别在于当队列为空时前者会抛出NoSuchElementException,而后者返回null。

E remove();
E poll();

添加元素的接口也有两个:add和offer。两者的区别在于当队列为满时前者会抛出IllegalStateException,而后者返回false。

boolean add(E e);
boolean offer(E e);

一般来说Queue类型的数据结构会有两种实现:数组和链表。对应到BlockingQueue就是ArrayBlockingQueue和LinkedBlockingQueue,两者都是基于AbstractQueue实现的。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable

这里很有必要说说AbstractQueue,AbstractQueue只是实现了add和remove方法,而且很有意思的是这两个方法都是借助他们对应的无异常版本的方法offer和pool来实现的。

    public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
    public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}

这样做的好处无疑是提供了良好的扩展性,也就是把真正添加/拿走元素的实现留给子类来完成(可以实现线程安全和非线程安全两个版本)。

研究BlockingQueue关注的重点就是Blocking是如果实现的,接下来的两篇文章将会详细分析ArrayBlockingQueue和LinkedBlockingQueue如何实现线程的Blocking。