并发之AQS原理(三) 如何保证并发

时间:2023-03-08 23:16:23
并发之AQS原理(三) 如何保证并发

并发之AQS原理(三) 如何保证并发

1. 如何保证并发

AbstractQueuedSynchronizer 维护了一个state(代表了共享资源)和一个FIFO线程等待队列(多线程竞争资源被阻塞时会将线程放入此队列)。

由于state是由volatie修饰的所以该变量的改动都是立等可见的。

并发之AQS原理(三) 如何保证并发

1.共享资源状态

private volatile int state;

2. 操作共享资源状态操作方法

// 读取该值
protected final int getState() {
return state;
}
// 更新该值 线程不安全
// 当独享该状态时使用该方法更加快捷,节省计算资源
protected final void setState(int newState) {
state = newState;
}
// 自旋更新该值 线程安全
// 当竞争修改该状态时可用该方法
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

2.如何争夺资源

AQS 定义了两种资源共享的方式 Exclusive(独占,一时间只有一个线程能访问该资源)、Share (共享,一时间可以有多个线程访问资源).

独占: 假设state初始状态为0,表示未锁定状态。线程A想使用该资源就把state修改为了1,那么线程B来访问资源时发现state是1并不是0他就会被AQS送入等待队列,

直到线程A将该资源设置为0。

共享:假设state初始状态为N,当有线程来访问后N就减少1个,直到N=0 这时就会阻塞新的线程来访问资源。当某一个线程执行完毕后会将state+1,相当于释放了该线程持有的锁。这样新的线程就可以继续访问该资源。

独占模式就像共享单车一时间只有一个人可以骑这个共享单车,共享模式就像公交车可以上去很多人,但是人一旦上满了就不能在上人了,必须要等车上的人下来后才能继续上人。

1. 独占获取资源( acquire(int i) )

此方法是独占模式下线程获取共享资源的入口。如果获取到了资源,线程直接返回,否则进入等待队列,直到获取到资源为止,而且个过程忽略中断的影响,这就是锁定的意义。获取到资源就可以去执行锁定范围内的代码了。

public final void acquire(int arg) {
// 由于&&的短路特性 获取到权限后 后面的等待队列等一系列功能将不再执行
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

该方法执行步骤:

  1. tryAcquire() 尝试直接去获取资源,如果成功则让当前执行线程继续执行。就不需要添加等待队列了。
  2. addWaiter() 将该线程加入等待队列的尾部,并且标记为独占模式。
  3. acquireQueued() 使线程在等待队列中获取资源,一直到获取到资源了才继续执行,如果在整个等待过程中被中断,返回true,否则返回false。
  4. selfInterrupt() 如果线程获取到资源后,发现线程被中断过则立即将线程中断。

1.tryAcquire() 直接获取资源


// 独占模式下尝试获取资源 成功则返回true
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

我们可以看到!tryAcquire(arg)是个取反的操作,也就是只有直接无法获取当前资源时才执行下一步。

然而AQS并没有提供实现方式。具体实现方式需要继承该框架的子类去实现。至于具体如何才能获取锁就是用户自定义的事情了。

至于为什么没有写成abstract。是为了如果不用该方法也不需要重写该方法。

在上一章的例子中,我给了一个具体实现。使用CAS方式获取state的权限。如果获取到了将当前线程放入独占执行线程中。否则视为没有抢到资源。

/**
* 尝试获取锁
*/
@Override
protected boolean tryAcquire(int arg) {
// 使用CAS方式修改状态。
// 修改成功则继续执行线程
// 线程会阻塞在这里等待。
if(compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 否则不能执行线程
return false;
}

2. addWaiter()加入等待队列

将当前线程节点放入队列然后返回当前节点信息。

队列与节点的具体情况参考

并发之AQS原理二CLH队列与队列Node解析


private Node addWaiter(Node mode) {
// 构造出一个新的等待线程节点。
Node node = new Node(Thread.currentThread(), mode);
// 获取一下队列尾部的节点,如果节点存在则直接将当前节点挂接在尾部
Node pred = tail;
if (pred != null) {
// 将当前尾部节点设置为前驱节点。
node.prev = pred;
// CAS 方式更新尾部节点。
if (compareAndSetTail(pred, node)) {
// 将原尾部节点的下一个节点设置为node
// 双向链表设计
pred.next = node;
return node;
}
}
// 如果挂载队尾节点也存在竞争 则使用无限CAS自旋方式设置队尾。
enq(node);
return node;
} // 自旋方式挂载尾节点
private Node enq(final Node node) { for (;;) {
Node t = tail;
// 当尾节点不存在时 头节点也不存在
// 该条件只可能是在尚未有队列的时候创建队列的第一个node时会触发。
if (t == null) {
// CAS方式更新头节点创建一个新的头节点 将头节点和为节点都指向创建的新节点。
// 由于是CAS方式只会有一个线程会创建成功。一旦有头节点就可以继续接入下个节点了
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果存在尾节点则执行过程和之前快速插入队列的逻辑相似。
// 更新尾巴节点 挂接双向列表。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

将节点添加到尾部流程:

  1. 依据当前线程和线程模型创建一个新的节点。
  2. 尝试将当前节点直接挂在尾节点上。
  3. 如果尾节点不存在则头节点也不存在,自旋方式创建一个尾节点和头节点。如果在过程中尾节点已经被添加,这继续将该节点挂载在尾节点上,

具体流程入下图:

并发之AQS原理(三) 如何保证并发

3.acquireQueued()等待休息直到其他线程唤醒

当通过tryAcquire()方法获取线程失败后,使用addWaiter()将该线程放入队列的尾部。然后进入等待状态,知道其他线程释放资源后唤醒此线程,自己在拿到资源,然后就能干自己的事情了。这就和买好了票排到了队尾等待前面的人一个一个完成任务后轮到自己。实际上该方法一直在自旋(发呆)直到自己变成了头结点,如果变成头结点后该线程已经中断则中断该线程的执行。

final boolean acquireQueued(final Node node, int arg) {
// 标记自己是否拿到了资源 true为没有获取到。
boolean failed = true;
try {
// 标记等待过程中是否被中断过
boolean interrupted = false;
// 开始自旋
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
// 如果自己的前置节点是头结点,并且自己获取到了资源权限
if (p == head && tryAcquire(arg)) {
// 将自己设置为头结点
setHead(node);
// 将前置节点的后续节点强连接断开
// 相当于自己已经是头结点了不需要前置节点了。帮助GC回收垃圾
p.next = null;
//标记已经获取到节点
failed = false;
// 判断该线程是否被中断过。
return interrupted;
}
// 这两个方式是检查状态、和让线程休息 下面会详细讲解这两个方法的作用
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

此方法用于检查状态,看看自己是否可以休息了。

为了更好理解我重复下上一章的知识点,AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 检查前置节点的装填
int ws = pred.waitStatus;
// 如果前置节点的状态为 等待唤醒的后置节点,这放心休息。
if (ws == Node.SIGNAL)
return true;
// 大于0表示该节点已经被中断,或者已经结束,
if (ws > 0) {
// 如果前置节点不是正常的等待状态那么就继续往前找直到找到一个正在等待装填的节点。将其后置节点断开接上当前节点。GC会回收一堆相互引用又没有外部引用的节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果该节点是其他状态就将其修改为等待状态 主要照顾共享节点
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

次方法就是让线程去等待。如果线程被中断过则返回true

private final boolean parkAndCheckInterrupt() {
// 调用park让线程进入wait状态
LockSupport.park(this);
// 检查线程是否中断过。
return Thread.interrupted();
}

acquireQueued()的执行流程如下。

  1. 节点进入队列尾巴后,检查状态,找到安全的等待位置等待。
  2. 调用park()让线程进入wait状态,等待unPark()或者interrput()唤醒。
  3. 被唤醒后,看自己是不是有资格拿到资源。如果拿到,将head指向当前节点。并返回从入队到拿到的整个过程中是否被中断过。如果没有则回到1号流程循环执行。

具体流程入下图:

并发之AQS原理(三) 如何保证并发

4.selfInterrupt()中断该线程的执行

如果线程在等待过程中被中断过,那么获取到资源后才会通知线程中断。


//自助中断该线程。
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}

5.acquire()竞争获取资源流程总结

  1. 调用用户自定义的同步器tryAcquire()去尝试获取资源,如果成功则直接进入临界区执行代码。
  2. 没有获取到资源则将该线程组装成一个结点放入队列尾部分,并且标记为独占模式。
  3. 使线程在队列中休息,唤醒时会被unPark()会尝试获取资源。获取到资源后才返回.如果在整个等待过程中被中断过则返回true 否则返回false.
  4. 如果线程在等待过程中被中断过,他是不会响应的,只是在获取到资源后直接调用自我中断,将中断线程。

流程图如下

并发之AQS原理(三) 如何保证并发

1. 独占释放资源( release(int i) )

此方法是acquire()的反操作,是独占模式下线程释放共享资源的入口。他会释放指定资源,如果彻底释放则 state = 0 ,他会唤醒等待队列里的其他线程来获取资源。这就是解锁的意义。下面是release()源码。

为了更好理解我再次重复下上一章的知识点,AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

public final boolean release(int arg) {
// 尝试释放资源
if (tryRelease(arg)) {
Node h = head;
// 头结点存在 且头结点状态不为0
// 如果头结点不为初始化状态则唤醒队列下一个等待的线程
if (h != null && h.waitStatus != 0)
// 唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}

1. 尝试释放资源tryRelease()

和tryAcquire()一样,这个方法时需要独占模式自定义同步器去实现的。如果是独占模式下该线程释放资源说明这个线程已经拿到资源了。直接减掉相应的资源状态即可,也不需要考虑线程安全的问题。但是release是通过tryRelease来判断是否释放过资源的,所以如果已经释放资源则应当返回true,否则返回false。

在上上章的EasyLock中释放实现如下。

@Override
protected boolean tryRelease(int arg) {
// 将当前线程清空
setExclusiveOwnerThread(null);
//将states设置为0
setState(0);
// 返回成功
return true;
}

2. 尝试唤醒后继线程unparkSuccessor()

此方法用于当前线程执行完毕后唤醒后继线程来获取资源。

private void unparkSuccessor(Node node) {
// node为当前线程所在的结点。
int ws = node.waitStatus;
// 该节点状态是有效的 就将其设置为初始化状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 找到下一个需要唤醒的结点
Node s = node.next;
//如果为空或已取消
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点往前找。 找到有效或者初始化的点,直到找到最前面的不是node的点。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果找的到 则唤醒该节点.
if (s != null)
LockSupport.unpark(s.thread);
}

也就是说 用unpark唤醒等待队列中最前的没有放弃的不是当前节点的节点线程

至此在独占模式下的解锁和上锁功能就将讲解完成了。

3. 共享模式获取资源( acquireShared(int i) )

在共享模式下线程获取资源的*入口,他会获取指定量的资源,如果全部释放 state=0了,那么他会唤醒等待队列的其他线程来获取资源。

通常执行流程是

  1. tryAcquireShared()尝试获取资源,成功则直接返回。
  2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

AQS返回值的语义定义:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。

public final void acquireShared(int arg) {
// 如果资源获取失败则放入队列
if (tryAcquireShared(arg) < 0)
// 共享方式放入等待队列
doAcquireShared(arg);
}

1. tryAcquireShared()

tryAcquireShared()依然要自定义同步器去实现。和tryAcquire()是一样的。

2.doAcquireShared()

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。

private void doAcquireShared(int arg) {
// 加入队列尾部 成功后返回当前节点
// 具体实现和上一节相同。
final Node node = addWaiter(Node.SHARED);
// 是否成功获取资源的标志
boolean failed = true;
try {
// 是否中断的标志
boolean interrupted = false;
// 开始自旋
for (;;) {
// 获取当前节点前驱节点
final Node p = node.predecessor();
// 如果当前节点就是头节点
if (p == head) {
// 尝试获取资源当前节点的状态
int r = tryAcquireShared(arg);
// 如果当前资源状态还有资源可以获取
if (r >= 0) {
// 将头指向自己拿到资源线程此时node被唤醒,
// 也可能是head用完资源后来唤醒自己的.即自己需要的资源数量大于当前空闲资源能提供的数量。
setHeadAndPropagate(node, r);
//将前头节点的后继节点置空 让GC好回收
p.next = null; // help GC
// 判断该线程是否被中断过
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 执行过程和独享模式的执行过程相同
// 先寻找休息的位置 然后将线程pack了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

3.setHeadAndPropagate()

共享模式下每一个拿到资源的线程在自己苏醒的情况下,如果条件符合比如还有剩余资源,都会唤醒后继的线程。如果被唤醒的线程发现资源不够用时会再次进入休眠。即便排在首位线程后面的线程只需要少量的资源也会因为首位线程资源不够造成的休眠而等待。

比如说 假设老大用完了然后释放了5个资源,老二这时需要7个资源、老三需要2个资源、老4需要2个资源。因为老大使用资源完成后优先唤醒的是排在队伍后面的老二但是老二发现当前资源不够用。就将自己park了,而排在老二之后的老三老四也不会被唤醒了,这样体量巨大的老二就把远小于自己的老三老四阻塞在队列之后,而在老大之前运行的线程释放资源以后才会再次唤醒老二,在这两次唤醒之间5个资源一直是处于无人访问状态的。

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
// 将头节点指向自己
setHead(node)
// 如果还有剩余的资源,继续唤醒下一个队列中的线程。
if (propagate > 0 || h == null || h.waitStatus < 0) {
// 设置队列中下一个元素
Node s = node.next;
if (s == null || s.isShared())
// 唤醒后继节点
doReleaseShared();
}
}

** 也就是说在自己线程苏醒的时候,会依据条件唤醒后继的节点。这就是共享模式区别于单机模式的精髓所在 **

4.释放共享资源releaseShared()

public final boolean releaseShared(int arg) {
// 尝试释放资源
if (tryReleaseShared(arg)) {
// 唤醒后继节点
doReleaseShared();
return true;
}
return false;
}

1.唤醒后继节点

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 如果该节点为等待唤醒的节点 则执行唤醒操作
if (ws == Node.SIGNAL) {
// 将H节点的数据从被等待唤醒转换为 初始化
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){
continue;
}
// 唤醒线程
unparkSuccessor(h);
}
// 如果该节点处于初始化状态 将其转换为可运行状态
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// 如果
if (h == head)
break;
}
}

唤醒步骤:

  1. 进入自旋
  2. 如果头节点为空或者头节点不等于尾节点,则跳出自旋,说明等待队列已经空了,
  3. 判断当前头节点状态,如果是被标记为待唤醒状态的节点,初始化该节点。唤醒该节点的线程。
  4. 如果该节点是初始化状态则将其标记为可运行状态。当标记为PROPAGATE时会将唤醒流程传播下去。因为h == head不成立,就不会跳出循环。

总结

AQS源码中帮我们做好了线程排队、等待、唤醒等操作我们只需要重写决定如何获取和释放的锁。这是典型的模板方法。下一章我们讲讲AQS中其他好用的API

下面几篇我们将讲解下基于AQS同步框架的一些实现