Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析

时间:2022-11-11 14:07:01

系列传送门:

LinkedBlockingQueue概述

LinkedBlockingQueue是由单链表构成的界限可选的阻塞队列,如不指定边界,则为Integer.MAX_VALUE,因此如不指定边界,一般来说,插入的时候都会成功。

LinkedBlockingQueue支持FIFO先进先出的次序对元素进行排序。

类图结构及重要字段

Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
// 单链表节点
static class Node<E> {
E item; Node<E> next; Node(E x) { item = x; }
} /** 容量,如果不指定就是Integer.MAX_VALUE */
private final int capacity; /** 原子变量,记录元素个数 */
private final AtomicInteger count = new AtomicInteger(); /**
* 哨兵头节点,head.next才是队列的第一个元素
*/
transient Node<E> head; /**
* 指向最后一个元素
*/
private transient Node<E> last; /** 用来控制同时只有一个线程可以从队头获取元素 */
private final ReentrantLock takeLock = new ReentrantLock(); /** 条件队列,队列为空时,执行出队take操作的线程将会被置入该条件队列 */
private final Condition notEmpty = takeLock.newCondition(); /** 用来控制同时只有一个线程可以从队尾插入元素 */
private final ReentrantLock putLock = new ReentrantLock(); /** 条件队列,队列满时,执行入队操作put的线程将会被置入该条件队列 */
private final Condition notFull = putLock.newCondition();
}
  • 单向链表实现,维护head和last两个Node节点,head是哨兵节点,head.next是第一个真正的元素,last指向队尾节点。
  • 队列中的元素通过AtomicInteger类型的原子变量count记录。
  • 维护两把锁:takeLock保证同时只有一个线程可以从对头获取元素,putLock保证只有一个线程可以在队尾插入元素。
  • 维护两个条件变量:notEmpty和notFull,维护条件队列,用以存放入队出队阻塞的线程。

如果希望获取一个元素,需要先获取takeLock锁,且notEmpty条件成立。

如果希望插入一个元素,需要先获取putLock锁,且notFull条件成立。

构造器

使用LinkedBlockingQueue的时候,可以指定容量,也可以使用默认的Integer.MAX_VALUE,几乎就是*的了,当然,也可以传入集合对象,直接构造。

	// 如果不指定容量,默认容量为Integer.MAX_VALUE  (1 << 30) - 1
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
} // 传入指定的容量
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
// 初始化last 和 head指针
last = head = new Node<E>(null);
} // 传入指定集合对象,容量视为Integer.MAX_VALUE,直接构造queue
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
// 写线程获取putLock
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}

出队和入队操作

队列的操作最核心的部分莫过于入队和出队了,后面分析的方法基本上都基于这两个工具方法。

LinkedBlockingQueue的出队和入队相对ArrayBlockingQueue来说就简单很多啦:

入队enqueue

    private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
  1. 将node连接到last的后面。
  2. 更新last指针的位置,指向node。

出队dequeue

    private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first; // head向后移一位
E x = first.item;
first.item = null;
return x;
}

队列中的元素实际上是从head.first开始的,那么移除队头,其实就是将head指向head.next即可。

阻塞式操作

E take() 阻塞式获取

take操作将会获取当前队列头部元素并移除,如果队列为空则阻塞当前线程直到队列不为空,退出阻塞时返回获取的元素。

如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。

    public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
// 首先要获取takeLock
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 如果队列为空, notEmpty不满足,就等着
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// c先赋值为count的值, count 减 1
c = count.getAndDecrement();
// 这次出队后至少还有一个元素,唤醒notEmpty中的读线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c == capacity 表示在该元素出队之前,队列是满的
if (c == capacity)
// 因为在这之前队列是满的,可能会有写线程在等着,这里做个唤醒
signalNotFull();
return x;
} // 用于唤醒写线程
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
// 获取putLock
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

void put(E e) 阻塞式插入

put操作将向队尾插入元素,如果队列未满则插入,如果队列已满,则阻塞当前线程直到队列不满。

如果线程在阻塞时被其他线程设置了中断标志,则抛出InterruptedException异常并返回。

    public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// 所有的插入操作 都约定 本地变量c 作为是否失败的标识
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
// 插入操作获取 putLock
putLock.lockInterruptibly();
try {
// 队列满,这时notFull条件不满足,await
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
// c先返回count的值 , 原子变量 + 1 ,
c = count.getAndIncrement();
// 至少还有一个空位可以插入,notFull条件是满足的,唤醒它
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c == 0 表示在该元素入队之前,队列是空的
if (c == 0)
// 因为在这之前队列是空的,可能会有读线程在等着,这里做个唤醒
signalNotEmpty();
} // 用于唤醒读线程
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 获取takeLock
takeLock.lock();
try {
// 唤醒
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

E poll(timeout, unit) 阻塞式超时获取

在take阻塞式获取方法的基础上额外增加超时功能,传入一个timeout,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回null。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 这里就是超时机制的逻辑所在
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

boolean offer(e, timeout, unit) 阻塞式超时插入

在put阻塞式插入方法的基础上额外增加超时功能,传入一个timeout,获取不到而阻塞的时候,如果时间到了,即使还获取不到,也只能立即返回null。

    public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException { if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

其他常规操作

boolean offer(E e)

offer(E e)是非阻塞的方法,向队尾插入一个元素,如果队列未满,则插入成功并返回true;如果队列已满则丢弃当前元素,并返回false。

    public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 此时队列已满,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
// 插入操作 获取putLock
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 加锁后再校验一次
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0; // 只要不是-1,就代表成功~
}

E poll()

从队列头部获取并移除第一个元素,如果队列为空则返回null。

    public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 如果队列不为空,则出队, 并递减计数
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

E peek()

瞅一瞅队头的元素是啥,如果队列为空,则返回null。

    public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 实际上第一个元素是head.next
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

Boolean remove(Object o)

移除队列中与元素o相等【指的是equals方法判定相同】的元素,移除成功返回true,如果队列为空或没有匹配元素,则返回false。

    public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
// trail 和 p 同时向后遍历, 如果p匹配了,就让trail.next = p.next代表移除p
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
} // trail为p的前驱, 希望移除p节点
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;// 移除p
// 如果p已经是最后一个节点了,就更新一下last
if (last == p)
last = trail;
// 移除一个节点之后,队列从满到未满, 唤醒notFull
if (count.getAndDecrement() == capacity)
notFull.signal();
}
//----- 多个锁 获取和释放的顺序是 相反的 // 同时上锁
void fullyLock() {
putLock.lock();
takeLock.lock();
}
// 同时解锁
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

总结

  • LinkedBlockingQueue是由单链表构成的界限可选的阻塞队列,如不指定边界,则为Integer.MAX_VALUE,因此如不指定边界,一般来说,插入的时候都会成功。
  • 维护两把锁:takeLock保证同时只有一个线程可以从对头获取元素,putLock保证只有一个线程可以在队尾插入元素。
  • 维护两个条件变量:notEmpty和notFull,维护条件队列,用以存放入队出队阻塞的线程。

如果希望获取一个元素,需要先获取takeLock锁,且notEmpty条件成立。

如果希望插入一个元素,需要先获取putLock锁,且notFull条件成立。

参考阅读

  • 《Java并发编程之美》
  • 《Java并发编程的艺术》

Java并发包源码学习系列:阻塞队列实现之LinkedBlockingQueue源码解析的更多相关文章

  1. Java并发包源码学习系列:JDK1&period;8的ConcurrentHashMap源码解析

    目录 为什么要使用ConcurrentHashMap? ConcurrentHashMap的结构特点 Java8之前 Java8之后 基本常量 重要成员变量 构造方法 tableSizeFor put ...

  2. Java并发包源码学习系列:阻塞队列BlockingQueue及实现原理分析

    目录 本篇要点 什么是阻塞队列 阻塞队列提供的方法 阻塞队列的七种实现 TransferQueue和BlockingQueue的区别 1.ArrayBlockingQueue 2.LinkedBloc ...

  3. Java并发包源码学习系列:阻塞队列实现之ArrayBlockingQueue源码解析

    目录 ArrayBlockingQueue概述 类图结构及重要字段 构造器 出队和入队操作 入队enqueue 出队dequeue 阻塞式操作 E take() 阻塞式获取 void put(E e) ...

  4. Java并发包源码学习系列:阻塞队列实现之PriorityBlockingQueue源码解析

    目录 PriorityBlockingQueue概述 类图结构及重要字段 什么是二叉堆 堆的基本操作 向上调整void up(int u) 向下调整void down(int u) 构造器 扩容方法t ...

  5. Java并发包源码学习系列:阻塞队列实现之DelayQueue源码解析

    目录 DelayQueue概述 类图及重要字段 Delayed接口 Delayed元素案例 构造器 put take first = null 有什么用 总结 参考阅读 系列传送门: Java并发包源 ...

  6. Java并发包源码学习系列:阻塞队列实现之SynchronousQueue源码解析

    目录 SynchronousQueue概述 使用案例 类图结构 put与take方法 void put(E e) E take() Transfer 公平模式TransferQueue QNode t ...

  7. Java并发包源码学习系列:阻塞队列实现之LinkedTransferQueue源码解析

    目录 LinkedTransferQueue概述 TransferQueue 类图结构及重要字段 Node节点 前置:xfer方法的定义 队列操作三大类 插入元素put.add.offer 获取元素t ...

  8. Java并发包源码学习系列:阻塞队列实现之LinkedBlockingDeque源码解析

    目录 LinkedBlockingDeque概述 类图结构及重要字段 linkFirst linkLast unlinkFirst unlinkLast unlink 总结 参考阅读 系列传送门: J ...

  9. Java并发包源码学习系列:基于CAS非阻塞并发队列ConcurrentLinkedQueue源码解析

    目录 非阻塞并发队列ConcurrentLinkedQueue概述 结构组成 基本不变式 head的不变式与可变式 tail的不变式与可变式 offer操作 源码解析 图解offer操作 JDK1.6 ...

随机推荐

  1. 点击得到QTableWidget中任意位置QPushButton的行列信息

    http://www.qtcn.org/bbs/read-htm-tid-51835.html http://www.qtcn.org/bbs/simple/?t43841.html 比如(点击每行最 ...

  2. C&num;线程同步(3)- 互斥量 Mutex

    文章原始出处 http://xxinside.blogbus.com/logs/47162540.html 预备知识:C#线程同步(1)- 临界区&Lock,C#线程同步(2)- 临界区&am ...

  3. 从零开始学安全&lpar;二十二&rpar;●PHP日期date参数表

    $date=new DateTime(); echo $date->format("Y-m-d h:i:s");

  4. mysql -- this is incompatible with sql&lowbar;mode&equals;only&lowbar;full&lowbar;group&lowbar;by

    select @@GLOBAL.sql_mode; set @@GLOBAL.sql_mode='STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ER ...

  5. 【zabbix】自定义监控项key值

    说明: zabbix自带的默认模版里包括了很多监控项,有时候为了满足业务需求,需要根据自己的监控项目自定义监控项,这里介绍一种自定义监控项的方式. 1,首先编写自定义监控脚本,本文以监控httpd进程 ...

  6. gitlab通过api创建组、项目、成员

    前戏 获取gitlab中admin用户的private_token Groups API 获取某个组的详细 curl --header "PRIVATE-TOKEN: *********&q ...

  7. UliPad安装

    1 http://www.cnblogs.com/dolphin0520/p/4012804.html 2 http://www.iplaypython.com/editor/ulipad.html

  8. 搭建 flask 应用

    参考文档:http://docs.jinkan.org/docs/flask/quickstart.html#a-minimal-application 1.使用Pycharm创建Flask应用 fr ...

  9. Partitioning by Palindromes UVA - 11584 简单dp

    题目:题目链接 思路:预处理出l到r为回文串的子串,然后如果j到i为回文串,dp[i] = min(dp[i], dp[j] + 1) AC代码: #include <iostream> ...

  10. ubuntu16&period;04下安装wine1&period;8&period;2

    如果是amd64则需要执行这个: sudo dpkg --add-architecture i386 1 1 添加wine最新的源 sudo add-apt-repository ppa:wine/w ...