Java多线程 -- JUC包源码分析4 -- 各种锁与无锁

时间:2022-09-21 10:35:17

说到锁,想必很多人就会被如下的各种专业名词绕晕了。本文试图厘清关于锁的各种概念,从而对锁有一个全面而深入的理解。

自旋锁/阻塞锁
独占锁(排它锁)/共享锁(读写锁)
公平锁/非公平锁
偏向锁/非偏向锁
可重入锁
悲观锁/乐观锁
ReentrantLock源码分析
AbstractQueuedSynchronized(队列同步器) –源码分析
ReentrantReadWriteLock源码分析
无锁
总结:synchronized关键字 与 Lock的区别


自旋锁与阻塞锁

自旋锁: 线程拿不到锁的时候,CPU空转,不放弃CPU,等待别的线程释放锁。前面所讲的CAS乐观锁,即是自旋锁的典型例子。
很显然,自旋锁只有在多CPU情况下,才可能使用。如果单CPU,占着不放,其他程序就没办法调度了。

阻塞锁: 线程拿不到锁的时候,放弃CPU,进入阻塞状态。等别的线程释放锁之后,再幻醒此线程,调度到CPU上执行。
synchonized关键字,Lock,都是阻塞锁(注:这里说synchonized关键字是阻塞锁,不准确,后面会再详细阐述)

自旋锁相比阻塞锁,少了线程切换的时间,因此性能可能更高。但如果自旋很多次,也拿不到锁,则会浪费CPU的资源。

在Java中,有1个 -XX:UseSpining 参数, 缺省为10。此参数用来指定,在JVM内部,自旋锁的cpu空转次数。

独占锁与共享锁(读写锁)

独占锁:Synchronized/ReentrantLock都是独占锁,或者叫“排他锁”。1个线程拿到锁之后,其他线程只能等待。
“读”与“读”互斥,“读”与“写”互斥,“写”与“写”互斥

共享锁:在笔者看来,共享锁和“读写锁”就是一个概念,读写分离。
“读”与“读”不互斥,“读”与“写”互斥,“写”与“写”互斥

因为“读”与“读”不互斥,所以1个线程拿到“读”锁之后,其他线程也可以拿到“读”锁,因此“读”锁是共享的。

但1个线程拿到”读“锁之后,另1个线程,拿不到”写“锁;反之亦然!

公平锁与非公平锁

大家都知道,ReentrantLock的构造函数,有个参数,指定锁是公平的,还是非公平的,缺省是非公平的。

    public ReentrantLock(boolean fair) {
sync = (fair)? new FairSync() : new NonfairSync();
}

那到底什么叫公平/非公平呢?
打个比喻:你去火车站买票,买票窗口前排了1个长队。
非公平:去了之后,直奔窗口,问有没有你要的票,有的话,买了走人;没有,再排队。
公平:去了之后,排在队伍末尾,直到前面的人都买完之后,你再买。

而Lock的公平/非公平,就是指当前线程去拿锁(去火车站买票)的时候的策略。关于Lock的源码,后面会详细分析。

偏向锁与非偏向锁

后面结合源码讲述

可重入锁

所谓可重入,就是指某个线程在拿到锁之后,在锁的代码块内部,再次去拿该锁,仍可拿到。
synchronized关键字, Lock都是可重入锁。
因此你可以在synchronized方法内部,调用另外一个synchronized方法;在lock.lock()的代码块里面,再次调用lock.lock()。

悲观锁与乐观锁

前面已讲

ReentrantLock的lock源码分析

队列同步器基本原理

在深入分析源码之前,先分别看一下的类图:
ReentrantLock类图:
Java多线程 -- JUC包源码分析4 -- 各种锁与无锁

ReentrantReadWriteLock类图:
Java多线程 -- JUC包源码分析4 -- 各种锁与无锁

从类图中,可以看到2点:
1. 公平锁/非公平锁的实现,分别用了FairSync/NonFairSync
2. 使用了一个共同的部件 AbstractQueuedSynchronizer,队列同步器。这是整个锁实现的核心部件,它的基本原理如下:

(1) 内部有一个state变量 + Thread exclusiveOwnerThread 变量
state = 0 //表示没有线程持有该锁
state = 1 //表示有1个线程,持有该锁
state > 1 //表示同1个线程,多次持有该锁,也就是”可重入“。
每一次lock(),state++; unlock(), state–;
(2) exclusiveOwnerThread变量,记录当前持有该锁的Thread
(3)当某个线程要拿锁,却发现state > 0时,进入阻塞队列。等待state = 0的时候,队列中某个线程被唤起。

关于”队列同步器“的源码,后面会详细分析

公平锁与非公平锁实现上的差别

先从构造函数入手:

    public ReentrantLock(boolean fair) {
//根据fair的取值,构建对应的NonfairSync(),或者FairSync()
sync = (fair)? new FairSync() : new NonfairSync();
}

再看lock()函数:

    public void lock() {
sync.lock(); //lock()是abstract的,分别由NonFairSync.lock(), FairSync.lock()实现
}

接下来,比较一下2者的lock()有何差异:

    final static class FairSync extends Sync {
...
final void lock() {
acquire(1); //上来就排队等,”公平“
}

...
}

final static class NonfairSync extends Sync {

final void lock() {
if (compareAndSetState(0, 1)) //上来就抢1次锁,如果state = 0,就抢到手了。把锁的拥有者,置为当前线程。(这也就是”非公平“的由来)
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //抢不到,再排队等
}

。。。
}

接下来比较一下acquire()函数的差异:acquire()是定义在基类AbstractQueuedSynchronizer里面的,里面调用的tryAcquire()是abstract的

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {


。。。
public final void acquire(int arg) {
if (!tryAcquire(arg) && //这里的tryAcquire()是abstract的
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

。。。
}

接下来比较2者在tryAcquire()实现上的差别:

    final static class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/

final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

final boolean Sync::nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) { //没人持锁,直接抢。非公平!!!
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //可重入锁的特性:当前锁是自己占有的,直接state++,赋值就可以了
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //c > 0 && current != getExclusiveOwnThread(),说明锁被别的线程持有,直接返回false
}
    final static class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}

/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (isFirst(current) && //没人持有,并且自己排在队列的第1个,再考虑申请锁。“公平”!!
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { //可重入锁的特性:当前锁是自己持有,再次加锁,不走加锁流程,直接state++就可以了。
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false; //c > 0 && current != getExclusiveOwnThread(),说明锁被别的线程持有,直接返回false
}
}

小结:tryAcquire()主要做了2件事:
(1)没人持有锁的情况下,通过CAS去抢锁。公平/非公平,抢法不一样:非公平,直接抢;公平,自己排在队列第1个,才去抢。
如果已经有人持有锁,函数之间返回false;

(2)实现了锁的可重入性:发现state > 0,并且锁是自己持有的,再次lock(),不走整个的lock()流程,直接state++就可以了。

接下来就是要讲的第3件事:拿不到锁的时候,如何排队?回到上面的代码:

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {


。。。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //没有拿到锁,把线程阻塞,放入阻塞队列
selfInterrupt();
}


。。。
}

接下来就涉及到“队列同步器”的核心部分:

AbstractQueuedSynchronized(AQS 队列同步器)

park/unpark 操作原语

在分析上面的acquireQueued函数之前,先来看一对关键的操作原语:
LockSupport.park/unpark

public class LockSupport {
public static void park(Object blocker) { //传入的是一个线程对象
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}

public static void unpark(Thread thread) { //传入的是一个线程对象
if (thread != null)
unsafe.unpark(thread);
}

park()/unpark()的底层,是native代码,用pthread_mutex实现的,在此不再深究。再此主要想讲一下它的有趣的特性:

你可能注意到,它是一对static函数,并且传入的参数是一个Thread。不同于synchrnozied/lock,作用在数据上面,间接的让线程阻塞; 它是直接作用在线程上面,可以精确控制让某个Thread阻塞,唤醒。

得益于LockSupport的这对强大特性,线程同步器,可以让所有阻塞的线程形成一个链表(也就是阻塞队列),然后有选择性的阻塞、唤醒某个线程。

屏蔽中断

我们知道lock()是不响应中断的,lockInterruptibly才会响应中断,那它内部是如何做到的呢?
答案就在下面的死循环里面:

    public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

//addWaiter()方法内部,就是把当前Thread对象,包装成一个Node,放在链表尾部。关于这个链表的实现,稍后再讲解,接下来,重点关注acquireQueued方法

//线程拿到锁之后,才会从acquireQueued方法退出。acquireQueued方法的返回值,告诉了,在这段一直阻塞,拿锁的时间里面,有没有人向自己发过中断。如果有,接下来调用selfInterrupt()响应中断。
//所谓的响应中断,其实就是把自己的中断标志位设为true。后续调用sleep(), wait()方法时,就会抛出InterruptedException

selfInterrupt();
}


final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) { //每次线程被唤醒之后(2种唤醒机制),去判断自己是否在链表头部,同时去拿锁。拿不到锁,再次park。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; //线程被中断唤醒,把interupt置为true,继续死循环
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
}

private final boolean parkAndCheckInterrupt() {
//线程在下面这个地方被阻塞。那它什么时候被唤醒呢?
//2种情况:
//case 1: 其他线程调用了unpark
//case 2: 其他线程调用了interrupt。这里要注意的是,lock()不能响应中断,但LockSupport.park会响应中断的

LockSupport.park(this);

//被唤醒之后,执行该行代码。判断自己是否因为case 2被唤醒的!
return Thread.interrupted();
}

至此,我们完整分析了ReentrantLock.lock()函数。总结下来,有以下几个关键点:
(1)公平/非公平的实现
(2)park/unpark原语
(3)线程组成的阻塞队列 – 无锁链表
(4)死循环,实现中断的屏蔽

ReentrantLock的unlock源码分析

接下来,分析ReentrantLock.unlock(),unlock的实现比较简单

    public void unlock() {
sync.release(1);
}

//AQS里面
public final boolean release(int arg) {
if (tryRelease(arg)) { //因为是可重入锁。所以tryRelease()会调用多次,直到state减到0,tryRelease才可能返回true
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //如果释放成功,唤醒链表中的后继者
return true;
}
return false;
}

//Sync里面:对于unlock,我们看到,就没有公平/非公平的区分了
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //state--
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { //因为是可重入锁,直到减到c == 0 为止
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

ReentrantLock的lockInterruptibly源码分析

从上面代码可以看出,lockInterruptibly至所以可以实现响应中断,是因为LockSupport.park是会响应中断的。

    public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg)) //tryAcquire刚刚已经分析过
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
break; //和之前的根本区别:遇到中断之后,直接从死循环退出了, 而不是循环去拿锁。
}
} catch (RuntimeException ex) {
cancelAcquire(node);
throw ex;
}
// Arrive here only if interrupted
cancelAcquire(node);
throw new InterruptedException();
}

无锁链表

至此,关于RetrantLock的3个核心函数:lock/unlock/lockInteruptibly分析完了,接下来分析一下AQS中的那个阻塞线程所组成的队列 – 无锁链表

    private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

private Node enq(final Node node) {
for (;;) { //跟AtomicInteger原理类似,乐观锁(死循环 + CAS)
Node t = tail;
if (t == null) { // Must initialize
Node h = new Node(); // Dummy header
h.next = node;
node.prev = h;
if (compareAndSetHead(h)) {
tail = node;
return h;
}
}
else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

ReentrantReadWriteLock源代码分析

如上面类图所示,其内部有2把锁, ReadLock 和 WriteLock。但2把锁,引用的是一个共同的Sync,还是通过一个AQS里面的state变量来实现。

    static abstract class Sync extends AbstractQueuedSynchronizer {

。。。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;


//关键点:把state变量拆成了2半,低16位,用来记录持有“写”锁,也即排它锁的线程;高16位,用来记录持有“读”锁,也即共享锁的线程数目。
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

下面分别看一下,ReadLock.lock(), WriteLock.lock()分别是如何实现的:

lock

    public static class ReadLock implements Lock, java.io.Serializable {
...
public void lock() {
sync.acquireShared(1);
}

...
}

//AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

//Sync
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 && //获取“读”锁时,已经有“写”锁,并且“写”锁不是自己的,则不能获取读锁,直接返回
getExclusiveOwnerThread() != current)
return -1;
if (sharedCount(c) == MAX_COUNT) //“读”锁数目,超过最大值,返回
throw new Error("Maximum lock count exceeded");
if (!readerShouldBlock(current) &&
compareAndSetState(c, c + SHARED_UNIT)) { //获取读锁成功
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
rh.count++;
return 1;
}
return fullTryAcquireShared(current);
}
    public static class WriteLock implements Lock, java.io.Serializable {

private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}


public void lock() {
sync.acquire(1);
}
}

protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) { //c != 0,说明要么上了"读”锁,要么上了“写”锁,或者2者都有
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) //c != 0 && w == 0,说明上了读锁,并且不是自己,则无法获取写锁,直接返回
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if ((w == 0 && writerShouldBlock(current)) || //没有线程持有写锁,则尝试加锁
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

关键点:
(1)从上面2种lock()代码可以看出,”读”与“读”是不互斥的,“读”与“写”互斥,“写”与“写”互斥。
(2)只有1种case,读锁与写锁不互斥。就是同1个线程,同时获得读锁和写锁,是允许的。

关于unlock,原理和ReentrantLock的类似,在此不再详述。

总结:synchronized关键字与Lock的区别

经过上述分析,我们可以总结出,Lock的实现,有以下几个关键原理:
(1)通过CAS操作state变量,实现加锁功能
对于ReentrantLock,state取0,1,>1 3类值,分别代码了锁的3种状态。
对于ReentrantReadWriteLock, state的低16位,代表写锁的状态;高16位,代表读锁的状态
(2)park/unpark原语,实现线程的阻塞/唤醒
(3)无锁链表实现的队列,把所有阻塞的线程串在一起
(4)对中断的处理:通过死循环,被中断唤醒的线程,再次加锁,再次阻塞。。

那它和synchronized关键字有什么区别呢?
虽然synchronized是语言层面的,Lock是上层应用层面的一个库,其实从底层实现原理来讲,是一样的,都是pthread mutex。但在使用层面,Lock更灵活:
(1)有公平/非公平策略
(2)有tryLock(),非阻塞方式。Synchronized只有阻塞方式
(3)有lockInterruptibly,可以响应中断。Synchronized不能响应中断
(4)Lock对应的Condition,其使用方式要比Synchronized对应的wait()/notify()更加灵活。

除此之外,2者加锁后,thread所处的状态是不一样的:
synchronized加锁,thread是处于Blocked状态;
Lock枷锁,thread是处于Waiting状态!

另外一个不一样的是,synchronized关键字内部有自旋机制,先自旋,超过次数,再阻塞;Lock里面没有用自旋机制。