死磕 java集合之DelayQueue源码分析

时间:2022-11-12 21:22:24

问题

(1)DelayQueue是阻塞队列吗?

(2)DelayQueue的实现方式?

(3)DelayQueue主要用于什么场景?

简介

DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务。

继承体系

死磕 java集合之DelayQueue源码分析

从继承体系可以看到,DelayQueue实现了BlockingQueue,所以它是一个阻塞队列。

另外,DelayQueue还组合了一个叫做Delayed的接口,DelayQueue中存储的所有元素必须实现Delayed接口。

那么,Delayed是什么呢?

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}

Delayed是一个继承自Comparable的接口,并且定义了一个getDelay()方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。

源码分析

主要属性

// 用于控制并发的锁
private final transient ReentrantLock lock = new ReentrantLock();
// 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于表示现在是否有可取的元素
private final Condition available = lock.newCondition();

从属性我们可以知道,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。

因为优先级队列是*的,所以这里只需要一个条件就可以了。

还记得优先级队列吗?点击链接直达【死磕 java集合之PriorityQueue源码分析

主要构造方法

public DelayQueue() {}

public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}

构造方法比较简单,一个默认构造方法,一个初始化添加集合c中所有元素的构造方法。

入队

因为DelayQueue是阻塞队列,且优先级队列是*的,所以入队不会阻塞不会超时,因此它的四个入队方法是一样的。

public boolean add(E e) {
return offer(e);
} public void put(E e) {
offer(e);
} public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
} public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}

入队方法比较简单:

(1)加锁;

(2)添加元素到优先级队列中;

(3)如果添加的元素是堆顶元素,就把leader置为空,并唤醒等待在条件available上的线程;

(4)解锁;

出队

因为DelayQueue是阻塞队列,所以它的出队有四个不同的方法,有抛出异常的,有阻塞的,有不阻塞的,有超时的。

我们这里主要分析两个,poll()和take()方法。

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}

poll()方法比较简单:

(1)加锁;

(2)检查第一个元素,如果为空或者还没到期,就返回null;

(3)如果第一个元素到期了就调用优先级队列的poll()弹出第一个元素;

(4)解锁。

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 堆顶元素
E first = q.peek();
// 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待
if (first == null)
available.await();
else {
// 堆顶元素的到期时间
long delay = first.getDelay(NANOSECONDS);
// 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素
if (delay <= 0)
return q.poll(); // 如果delay大于0 ,则下面要阻塞了 // 将first置为空方便gc,因为有可能其它元素弹出了这个元素
// 这里还持有着引用不会被清理
first = null; // don't retain ref while waiting
// 如果前面有其它线程在等待,直接进入等待
if (leader != null)
available.await();
else {
// 如果leader为null,把当前线程赋值给它
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待delay时间后自动醒过来
// 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期
// 这里即使醒过来后也不一定能获取到元素
// 因为有可能其它线程先一步获取了锁并弹出了堆顶元素
// 条件锁的唤醒分成两步,先从Condition的队列里出队
// 再入队到AQS的队列中,当其它线程调用LockSupport.unpark(t)的时候才会真正唤醒
// 关于AQS我们后面会讲的^^
available.awaitNanos(delay);
} finally {
// 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程
if (leader == null && q.peek() != null)
// signal()只是把等待的线程放到AQS的队列里面,并不是真正的唤醒
available.signal();
// 解锁,这才是真正的唤醒
lock.unlock();
}
}

take()方法稍微要复杂一些:

(1)加锁;

(2)判断堆顶元素是否为空,为空的话直接阻塞等待;

(3)判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;

(4)没到期,再判断前面是否有其它线程在等待,有则直接等待;

(5)前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素;

(6)获取到元素之后再唤醒下一个等待的线程;

(7)解锁;

使用方法

说了那么多,是不是还是不知道怎么用呢?那怎么能行,请看下面的案例:

public class DelayQueueTest {
public static void main(String[] args) {
DelayQueue<Message> queue = new DelayQueue<>(); long now = System.currentTimeMillis(); // 启动一个线程从队列中取元素
new Thread(()->{
while (true) {
try {
// 将依次打印1000,2000,5000,7000,8000
System.out.println(queue.take().deadline - now);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start(); // 添加5个元素到队列中
queue.add(new Message(now + 5000));
queue.add(new Message(now + 8000));
queue.add(new Message(now + 2000));
queue.add(new Message(now + 1000));
queue.add(new Message(now + 7000));
}
} class Message implements Delayed {
long deadline; public Message(long deadline) {
this.deadline = deadline;
} @Override
public long getDelay(TimeUnit unit) {
return deadline - System.currentTimeMillis();
} @Override
public int compareTo(Delayed o) {
return (int) (getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
} @Override
public String toString() {
return String.valueOf(deadline);
}
}

是不是很简单,越早到期的元素越先出队。

总结

(1)DelayQueue是阻塞队列;

(2)DelayQueue内部存储结构使用优先级队列;

(3)DelayQueue使用重入锁和条件来控制并发安全;

(4)DelayQueue常用于定时任务;

彩蛋

java中的线程池实现定时任务是直接用的DelayQueue吗?

当然不是,ScheduledThreadPoolExecutor中使用的是它自己定义的内部类DelayedWorkQueue,其实里面的实现逻辑基本都是一样的,只不过DelayedWorkQueue里面没有使用现成的PriorityQueue,而是使用数组又实现了一遍优先级队列,本质上没有什么区别。


欢迎关注我的公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一起畅游源码的海洋。

死磕 java集合之DelayQueue源码分析

死磕 java集合之DelayQueue源码分析的更多相关文章

  1. 死磕 java集合之PriorityBlockingQueue源码分析

    问题 (1)PriorityBlockingQueue的实现方式? (2)PriorityBlockingQueue是否需要扩容? (3)PriorityBlockingQueue是怎么控制并发安全的 ...

  2. 死磕 java集合之PriorityQueue源码分析

    问题 (1)什么是优先级队列? (2)怎么实现一个优先级队列? (3)PriorityQueue是线程安全的吗? (4)PriorityQueue就有序的吗? 简介 优先级队列,是0个或多个元素的集合 ...

  3. 死磕 java集合之CopyOnWriteArraySet源码分析——内含巧妙设计

    问题 (1)CopyOnWriteArraySet是用Map实现的吗? (2)CopyOnWriteArraySet是有序的吗? (3)CopyOnWriteArraySet是并发安全的吗? (4)C ...

  4. 死磕 java集合之LinkedHashSet源码分析

    问题 (1)LinkedHashSet的底层使用什么存储元素? (2)LinkedHashSet与HashSet有什么不同? (3)LinkedHashSet是有序的吗? (4)LinkedHashS ...

  5. 死磕 java集合之ConcurrentHashMap源码分析(三)

    本章接着上两章,链接直达: 死磕 java集合之ConcurrentHashMap源码分析(一) 死磕 java集合之ConcurrentHashMap源码分析(二) 删除元素 删除元素跟添加元素一样 ...

  6. 死磕 java集合之ArrayDeque源码分析

    问题 (1)什么是双端队列? (2)ArrayDeque是怎么实现双端队列的? (3)ArrayDeque是线程安全的吗? (4)ArrayDeque是有界的吗? 简介 双端队列是一种特殊的队列,它的 ...

  7. 【死磕 Java 集合】— ConcurrentSkipListMap源码分析

    转自:http://cmsblogs.com/?p=4773 [隐藏目录] 前情提要 简介 存储结构 源码分析 主要内部类 构造方法 添加元素 添加元素举例 删除元素 删除元素举例 查找元素 查找元素 ...

  8. 死磕 java集合之LinkedList源码分析

    问题 (1)LinkedList只是一个List吗? (2)LinkedList还有其它什么特性吗? (3)LinkedList为啥经常拿出来跟ArrayList比较? (4)我为什么把LinkedL ...

  9. 死磕 java集合之ConcurrentSkipListSet源码分析——Set大汇总

    问题 (1)ConcurrentSkipListSet的底层是ConcurrentSkipListMap吗? (2)ConcurrentSkipListSet是线程安全的吗? (3)Concurren ...

随机推荐

  1. 【SAP业务模式】之ICS(六):发票输出类型

    这篇开始主要讲述发票输出类型: 首先我们新建一个发票类型,用于公司间的发票MIV,而标准的发票类型还是F2保持不变: 一.新建发票类型: 目录:SPRO-销售与分销-出具发票-开票凭证-定义出具发票类 ...

  2. flash小游戏在Kongregate上线——BasketBall Master&lpar;篮球大师&rpar;

    小游戏地址,欢迎上去留言评论.游戏完成度没有达到期望水平,只能算完成了核心玩法吧,一些其他构想来不及实现. BasketBall Master(篮球大师) 这个小游戏很早之前就基本做好了,只因有些细节 ...

  3. arry&lpar;&rpar;数组的理解及api的使用(二)

    注意:本文都来自于w3school中文网,如果需要完整版请去--http://www.w3school.com.cn/jsref/jsref_obj_array.asp 1.1 slice() 方法- ...

  4. java中拼接两个数组

    int a[]={1,2,3,2}; int b[]={4,2,90,8,98}; int[] d3 = new int[a.length + b.length]; System.arraycopy( ...

  5. 5种方法推导Normal Equation

    引言: Normal Equation 是最基础的最小二乘方法.在Andrew Ng的课程中给出了矩阵推到形式,本文将重点提供几种推导方式以便于全方位帮助Machine Learning用户学习. N ...

  6. mysql数据库第三弹

    mysql数据库知识拓展 视图 视图是一个虚拟表(非真实存在),其本质是[根据SQL语句获取动态的数据集,并为其命名],用户使用时只需使用[名称]即可获取结果集,并可以将其当作表来使用. SELECT ...

  7. win10 激活(亲测可用)

    转自http://www.ylmfwin100.com/ylmf/8643.html 1.以管理员身份运行命令行(这一步一定要做) 2.依次输入以下命令 slmgr.vbs /upk (此时弹出窗口显 ...

  8. gcc&sol;g&plus;&plus;

    $gcc -g -Wall -ansi -pedantic main.cpp -lstdc++ -std=c++11 -lpthread -o xmain

  9. 131、ThreadLocal (转载)

    http://blog.csdn.net/lufeng20/article/details/24314381 http://baike.baidu.com/link?url=7eL0qQm_5ULls ...

  10. Go语言反射之反射调用

    ## 1 概述利用反射,不仅可以获取信息,还可以创建实例,执行函数和方法.就是反射代理执行. <!-- more -->## 2 创建实例创建实例的前提是具有 `reflect.Type` ...