SynchronousQueue------TransferQueue源码分析

时间:2023-03-09 17:11:15
SynchronousQueue------TransferQueue源码分析

不像ArrayBlockingQueue、LinkedBlockingDeque之类的阻塞队列依赖AQS实现并发操作,SynchronousQueue直接使用CAS实现线程的安全访问。由于源码中充斥着大量的CAS代码,不易于理解,所以按照笔者的风格,接下来会使用简单的示例来描述背后的实现模型。

队列的实现策略通常分为公平模式和非公平模式,接下来将分别进行说明。

3.2.1、公平模式下的模型:

  公平模式下,底层实现使用的是TransferQueue这个内部队列,它有一个head和tail指针,用于指向当前正在等待匹配的线程节点。
初始化时,TransferQueue的状态如下:SynchronousQueue------TransferQueue源码分析接着我们进行一些操作:1、线程put1执行 put(1)操作,由于当前没有配对的消费线程,所以put1线程入队列,自旋一小会后睡眠等待,这时队列状态如下:SynchronousQueue------TransferQueue源码分析2、接着,线程put2执行了put(2)操作,跟前面一样,put2线程入队列,自旋一小会后睡眠等待,这时队列状态如下:SynchronousQueue------TransferQueue源码分析3、这时候,来了一个线程take1,执行了 take操作,take1线程一定可以和次首节点(head.next)也是匹配的呢?其实大家可以拿个纸画一画,就会发现真的就是这样的。
公平策略总结下来就是:队尾匹配队头出队。
执行后put1线程被唤醒,take1线程的 take()方法返回了1(put1线程的数据),这样就实现了线程间的一对一通信,这时候内部状态如下:SynchronousQueue------TransferQueue源码分析4、最后,再来一个线程take2,执行take操作,这时候只有put2线程在等候,而且两个线程匹配上了,线程put2被唤醒,
take2线程take操作返回了2(线程put2的数据),这时候队列又回到了起点,如下所示:SynchronousQueue------TransferQueue源码分析

一个节点加进去后,设置这个节点的waiter=这个线程,并且这个线程park阻塞,消费者生产者来了之后,从节点中获取waiter线程唤醒线程,线程继续执行完,然后设置节点的waiter=null。线程中断时候,设置这个线程对应的节点的item是节点自己,来标记这个线程中断了或者超时了。

SynchronousQueue------TransferQueue源码分析

put方法调用,生产者或者消费者进去。增加一个节点进去要2步:尾节点next新增元素,尾节点指向新元素。CAS就是 需要加锁的地方用cas,不需要的地方不用CAS。就是这2步用CAS单线程操作成功

第一个节点进来,旋转后设置线程,然后阻塞,后面节点进来,加到队列,设置节点的线程,然后阻塞。队列只可能全部是生产者节点或者消费者节点,尾进头出。头没有元素尾有元素。多个生产者或者消费者,入队的时候是多线程同时操作一个共享变量TransferQueue。

void advanceTail(QNode t, QNode nt) {   //TransferQueue的方法
if (tail == t) //准备把队列的尾节点从t(局部变量的尾节点)变为tn,tail !=t,说明队列的尾节点已经改变了,别的线程帮助推进了,就不用动了。
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt); //设置这个queue的尾节点,从t变成nt(不是这样的),原来的值是t就成功。CAS的时候一定要把原来的值传进去,看原来的值是不是这个,然后更新。
}
//TransferQueue的方法
void advanceHead(QNode h, QNode nh) { // 将head节点从h(原来头结点)变为nh(原来头结点的第一个next节点)
if (h == head && // 原来的head节点地址暂存h,
UNSAFE.compareAndSwapObject(this, headOffset, h, nh)) //改变head的地址值指向
h.next = h; // forget old next,原来头结点next指针改变
}

SynchronousQueue------TransferQueue源码分析

SynchronousQueue------TransferQueue源码分析

需要进行原子操作的方法有:队列改变头结点,队列改变尾节点,队列改变cleanMe节点,节点修改next值,节点修改item值,都用CAS操作,就相当于加锁了,多个线程同时修改共享变量,只有一个成功。
CAS的时候一定要把原来的值传进去,看原来的值是不是这个,然后更新。
一个节点加进去后,设置这个节点的waiter=这个线程,并且这个线程park阻塞,消费者生产者来了之后,从节点中获取waiter线程,唤醒线程,线程继续执行完,然后设置节点的waiter=null。线程中断时候,设置这个线程对应的节点的item是节点自己,来标记这个线程中断了。

SynchronousQueue------TransferQueue源码分析

SynchronousQueue------TransferQueue源码分析

节点超时或者中断时候,删除节点

SynchronousQueue------TransferQueue源码分析

SynchronousQueue------TransferQueue源码分析

package com.itmayiedu;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.*;
import java.util.Spliterator;
import java.util.Spliterators; public class SynchronousQueue1<E> {
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
static final int NCPUS = Runtime.getRuntime().availableProcessors();
static final int maxTimedSpins = (NCPUS < ) ? : ;
static final int maxUntimedSpins = maxTimedSpins * ;
static final long spinForTimeoutThreshold = 1000L; static final class TransferQueue<E> extends Transferer<E> {
static final class QNode {
volatile QNode next;
volatile Object item;
volatile Thread waiter;
final boolean isData;//生产消费类型
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
//设置节点的next,item,采用到CAS
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&//next不对就不需要cas了,避免依次cas。即使这里被打断了,另外线程修改了next
//也不要紧,下面的CAS会失败
//如果执行到这里让出时间片,别的线程入队没有出队,head没改,tail改了,
//此时h是对的,t是错的,t还是之前的tail,cas的时候说t的next应该是null,
//此时不是了(现在的t也就是之前的tail已经有后续节点了),所以不进行cas,继续continue,
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
//节点被取消了,item就是自己,超时和线程中断,
boolean isCancelled() {
return item == this;
}
boolean isOffList() {
return next == this;
}
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = getUnsafe();//= sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
transient volatile QNode head;
transient volatile QNode tail;
/**
大家知道 删除一个节点 直接 A.CASNext(B, B.next) 就可以,但是当 节点 B 是整个队列中的末尾元素时,
一个线程删除节点B, 一个线程在节点B之后插入节点 这样操作容易致使插入的节点丢失, 这个cleanMe很像
ConcurrentSkipListMap 中的 删除添加的 marker 节点, 他们都是起着相同的作用
*/
transient volatile QNode cleanMe;//要删除的节点的前一个节点,cleanMe的下一个节点是因中断或超时需要删除的节点,
//这个节点所在线程被中断了,或者节点超时了,放在上面不要紧,取节点时候x==m,跳过即可。
//在清除 队列最尾端节点时, 不直接删除这个节点, 而是间删除节点的前继节点标示为 cleanMe 节点, 为下次删除做准备,
//队列里面只能全是生产者或者消费者,不可能生产和消费者共存。,然后消费者过来移除节点。(队列即可能是生产者队列也可能是消费者队列)
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
void advanceHead(QNode h, QNode nh) {
if (h == head &&
UNSAFE.compareAndSwapObject(this, headOffset, h, nh))
h.next = h; // 丢弃头节点 help gc,
}
void advanceTail(QNode t, QNode nt) {
if (tail == t)//即使这里也会被停住,其他线程执行
UNSAFE.compareAndSwapObject(this, tailOffset, t, nt);
}
boolean casCleanMe(QNode cmp, QNode val) {
return cleanMe == cmp &&
UNSAFE.compareAndSwapObject(this, cleanMeOffset, cmp, val);
}
//TransferQueue 的 transfer方法。transfer谁在调用。每个节点的线程调用,共享同一个队列。
//transfer里面有加入队列,阻塞awaitFulfill,阻塞后唤醒,取节点内容,唤醒节点,取到节点返回,被唤醒返回 代码。
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);//生产者isData=true,消费者isData=false
for (;;) {
//多线程操作queue的成员变量tail,head,是会跟着改变的。t,h就不会变。t,h还是之前的那个tail和head。
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 其他线程改变了头尾节点,再次来
continue;
//1 加入队列排队阻塞(即可能是生产者队列也可能是消费者队列),然后等待唤醒。
if (h == t || t.isData == isData) { //第一个节点或者进来的节点跟尾节点生产消费类型一样,就加入排队,否则就是来取节点的。
QNode tn = t.next;
if (t != tail) // 其他线程改变了尾节点,再次来
continue;
if (tn != null) { //其他线程为尾节点添加了next,但是还没有将tail指向新的节点,再次来
advanceTail(t, tn);//帮助将tail指向新的节点
continue;
}
if (timed && nanos <= )
return null;
if (s == null) //之前的节点s没有help gc,这里不为null就继续使用。
s = new QNode(e, isData);
//加入队列,成功则T一定是是S的前一个节点
if (!t.casNext(null, s)) // 尾节点的next从null变为新节点,每一处都有可能被停住。失败了说明有其他线程将尾节点的next从null变为其他新节点
continue;
advanceTail(t, s); // 改变尾节点,失败了,说明别的线程已经做了,自己什么都不用做。
//当前线程再这个函数的这里阻塞,唤醒時候,从这里执行,函數参数还是原来的。
//2 加入队列后排队阻塞等待
Object x = awaitFulfill(s, e, timed, nanos);//生产者或者消费者阻塞,等待唤醒。即可能是生产者被唤醒也可能是消费者被唤醒。
//3 排队被唤醒
if (x == s) { //线程中断超时返回的x是节点s,否则就是正常取出返回。
clean(t, s);//节点的删除是节点上的线程自己来删除的,失效节点不影响正常的入队和出队,跳过即可
return null;
}
//不是中断的返回。x是item,只不过被改变了。
if (!s.isOffList()) { // 若s节点还没有从队列删除
advanceHead(t, s); // 改变头节点
if (x != null) // item != null,说明是消费者被生产者唤醒了,
s.item = s;//item变为自己
s.waiter = null;//节点等待的线程变为null。
}
return (x != null) ? (E)x : e;//生产者被唤醒x=null,返回原来的e,消费者被唤醒x!=null,返回x。
//取节点,取到节点后,取唤醒别人。
} else { //如果队列里面之前是生产者,现在消费者过来了。
QNode m = h.next; // 获取头节点的下一个节点,,每次都是从头结点下一个节点开始获取数据
if (t != tail || m == null || h != head)
continue; // 其他线程改变了头尾节点,再次来
Object x = m.item;
if (isData == (x != null) || // true
x == m || // m节点的item=节点自己,这个节点m的线程被中断了或者超时了,就跳过这个节点,不使用
!m.casItem(x, e)) { // 消费者过来,就把节点m的内容从节点内容变为null,生产者过来,就把节点内容从null变为生产者的内容。此时节点的内容不再等于节点的内容
advanceHead(h, m); // 将m设置为头结点,h出列,然后重试
continue;
}
advanceHead(h, m); // 成功匹配了,m设置为头结点h出列,向前推进,移除匹配到的节点
//唤醒节点的线程
LockSupport.unpark(m.waiter);
return (x != null) ? (E)x : e;//消费者过来返回生产者的内容x,生产者过来返回生产者自己的内容
}
}
}
//awaitFulfill里面有自旋,阻塞,唤醒后代码
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {//s是节点,e是节点元素
final long deadline = timed ? System.nanoTime() + nanos : 0L;// 计算超时时间点
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?//这个节点是第一个节点,超时旋转32次不超时旋转512次,不是第一个节点不旋转。
(timed ? maxTimedSpins : maxUntimedSpins) : );
for (;;) {
if (w.isInterrupted())// 被中断的线程不一定要立即停止正在做的事情。相反,中断是礼貌地请求另一个线程在它愿意并且方便的时候停止它正在做的事情。
s.tryCancel(e);//线程中断,就将这个线程所在的节点的item内容设置成节点自己
Object x = s.item;
if (x != e)//节点的内容不在等于节点的内容(即可能是生产者被唤醒也可能是消费者被唤醒),生产者被唤醒了返回null,消费者被唤醒了返回e。
//线程中断了,item=s,item!=e,此时也返回,只不过返回的x=节点自己。
return x;
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > )
--spins;//自旋的过程中会不断判断是否超时或者中断了,如果中断或者超时了则调用tryCancel()取消该节点。
else if (s.waiter == null)
s.waiter = w;//新进来节点的waiter是当前线程,自旋完之后给这个节点设置线程,
else if (!timed)// 设置没有超时地阻塞线程
LockSupport.park(this);//当前线程再这个函数的这里阻塞,唤醒時候,从这里执行,函數参数还是原来的。线程再这个函数这里阻塞唤醒和直接运行是一样的,形参和局部变量都在线程里面不变。
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);// 设置具有超时地阻塞线程
}
}
//执行清除的时候,入队和出队正常执行,取到这个节点时候跳过即可
//清除这个节点的线程就是这个节点上阻塞的线程来循环移除。cleanMe只有一个,多个尾节点失效时候就会有多线程来调用clean方法,
void clean(QNode pred, QNode s) {//s是提出的节点,pred是前一个节点
s.waiter = null;
while (pred.next == s) {
QNode h = head;
QNode hn = h.next;
if (hn != null && hn.isCancelled()) {//删除的节点是第一个节点h.next
advanceHead(h, hn);//推进头节点,头节点的next=头节点,pred.next = pred != s,
continue;
}
QNode t = tail;
if (t == h)
return;// 队列为空,直接return null
QNode tn = t.next;
if (t != tail)// 不一致,说明有其他线程改变了tail节点,重新开始
continue;
if (tn != null) { // tn != null 推进tail节点,重新开始
advanceTail(t, tn);
continue;
}
//列表上最后插入的节点不能被删除。 我们将其前驱设置为“CleanMe”,
if (s != t) { // 移除的节点不是尾节点,接以pred.casNext(s, s.next)方式来进行删除
QNode sn = s.next;
// 如果s已经被移除退出循环,
if (sn == s || pred.casNext(s, sn))//s的前一个节点的下一个节点从s变为s的next,
return;
}
// s是尾节点,
QNode dp = cleanMe;
//此时cleanMe != null,先删除cleanMe标记需要删除的节点,然后将cleanMe置为null,让后再将pred赋值给cleanMe
if (dp != null) { // 如果dp不为null,先删除上次需要删除的节点d,
QNode d = dp.next;//cleanMe标记需要删除的节点d,
QNode dn;
if (d == null || // 节点d已经删除
d == dp || // 原来的节点 cleanMe 已经通过 advanceHead 进行删除
!d.isCancelled() || // 原来的节点 s已经删除
(d != t && // d not tail and
(dn = d.next) != null && // d 不是tail节点
dn != d &&
dp.casNext(d, dn))) // 删除cleanMe标记的节点d,多线程访问只有一个成功,失败的继续循环,此时这个尾节点有可能是中间节点了,直接删除。真正尾节点不会删除。
// 清除 cleanMe 置为null,
casCleanMe(dp, null);
if (dp == pred)//dp == pred 若成立, 说明这次删除的就是上次要删除的, 直接return, 不然的话要再次循环来删除这次需要删除的节点。
return;
//cleanMe == null, 上次没有要删除的节点,这次因为是尾节点也不删除,则 前继节点pred标记为 cleanMe, 为下次删除做准备。下次是别的失效的节点的线程。
} else if (casCleanMe(null, pred))
return;
}
} private static final sun.misc.Unsafe UNSAFE;
private static final long headOffset;
private static final long tailOffset;
private static final long cleanMeOffset;
static {
try {
UNSAFE = getUnsafe();//= sun.misc.Unsafe.getUnsafe();
Class<?> k = TransferQueue.class;
headOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("head"));
tailOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("tail"));
cleanMeOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("cleanMe"));
} catch (Exception e) {
throw new Error(e);
}
}
} private transient volatile Transferer<E> transferer; public SynchronousQueue1() {
this(false);
} public SynchronousQueue1(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferQueue<E>();
}
//生产者:1排队被消费者唤醒,返回e。2取消费者节点。都不为null。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, false, ) == null) {
Thread.interrupted();
throw new InterruptedException();
}
} public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
} public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, ) != null;
}
//消费者:1排队被生产者唤醒,返回e。2取生产者节点。都不为null
public E take() throws InterruptedException {
E e = transferer.transfer(null, false, );
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
} public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
} public E poll() {
return transferer.transfer(null, true, );
} private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException tryReflectionInstead) {}
try {
return java.security.AccessController.doPrivileged
(new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
for (java.lang.reflect.Field f : k.getDeclaredFields()) {
f.setAccessible(true);
Object x = f.get(null);
if (k.isInstance(x))
return k.cast(x);
}
throw new NoSuchFieldError("the Unsafe");
}});
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
e.getCause());
}
} }

生产者线程调用transfer,在堆里面构造一个节点然后加到queue的尾部,线程自己然后归属到这个节点,阻塞(方法局部变量s保留)。消费者线程调用transfer,修改并移除queue的头节点,返回值,唤醒头节点的线程。头节点上生产者线程唤醒后,通过线程栈里面局部变量s(就是生产者线程归属的节点),设置item=自己,waiter = null(把线程自己从节点属性移除,线程自己还是可以继续运行的,只要线程不停止就会一直运行,只是移除了一个节点对象对他的引用),返回值。或者在队列里面移除这个曾经归属的节点。

为什么使用局部变量

QNode t = tail;
QNode h = head;

局部变量就是每个线程里面都有自己的一个头尾节点(不一定是真正的头尾节点),修改共享堆里面的内容时候,都要判断头尾节点是否改变了(其实就相当于做了单线程控制),不是用局部变量直接使用堆里面的头尾节点,执行这一句时候advanceTail(t, tn)在判断tail的原值时候一直是成立的,就会出现问题。执行这一句 t.casNext(null, s) 不会出现问题。

SynchronousQueue------TransferQueue源码分析