Java并发之ReentrantReadWriteLock源码解析(二)

时间:2023-03-10 03:12:49
Java并发之ReentrantReadWriteLock源码解析(二)

先前,笔者和大家一起了解了ReentrantReadWriteLock的写锁实现,其实写锁本身实现的逻辑很少,基本上还是复用AQS内部的等待队列思想。下面,我们来看看ReentrantReadWriteLock的读锁实现。

当调用读锁的lock()方法时,会调用到Sync的父类AQS实现的acquireShared(int arg)方法,在这个方法又会调用子类实现的tryAcquireShared(arg)方法尝试获取读锁,如果返回大于等于0,则代表获取读锁成功,返回结果小于0代表获取读锁失败。则会执行doAcquireShared(arg)方法进入等待队列。

下面我们来看看在tryAcquireShared(arg)方法中是如何尝试获取读锁的,tryAcquireShared(arg)方法首先会获取读写锁当前状态c,如果exclusiveCount(c)的结果不为0则代表有线程占有写锁,会接着判断写锁的独占线程是否是当前请求读锁的线程,如果不是则进入<1>处的分支返回获取读锁失败。只有写锁不被其他线程持有,或者占有写锁的线程请求读锁,才可以跳过分支<1>前进到代码<2>处。

在代码<2>处会获取目前读锁被获取的次数,之后会执行下readerShouldBlock()判断当前请求读锁的线程是否应该阻塞,Sync并没有实现这个方法,而是交由子类公平锁和非公平锁实现,这个方法的实现一般是判断读写锁的等待队列中是否有线程,如果是公平锁的实现,只要队列中有等待读写锁的线程,就会判定需要阻塞当前读线程,如果是非公平锁的实现,就会判断当前队列中最前面需要锁的线程是读线程还是写线程,如果是读线程则就不阻塞,大家可以一起共享读锁,如果是写线程则需要阻塞。

之后会判断获取读锁的次数是否已到达MAX_COUNT,如果没有到达才会执行CAS尝试对获取读锁的次数+1,由于获取读锁的次数是存储在state的前16位,所以这里会加上SHARED_UNIT,并且这里我们也看到tryAcquireShared(int unused)方法给传入的获取次数用的变量命名是unused,读锁在这里并不使用外部传入的获取次数,因为这个获取次数可能大于1,会出现当前获取读锁的次数+1(SHARED_UNIT)刚好到MAX_COUNT,+2(2*SHARED_UNIT)超过MAX_COUNT。

如果判断当前请求读锁的线程不阻塞,当前获取读锁的次数小于MAX_COUNT且CAS对获取读锁+1成功,则会进入<3>处的分支,如果原先读锁被获取的次数为0,即r为0,代表当前线程是第一个获取读锁的线程,之前说过第一个获取读锁的线程会做一个优化,Sync的字段firstReader用于指向第一个获取读锁的线程,firstReaderHoldCount用于统计第一个线程获取读锁的次数,因为可能出现线程在获取读锁后又重新获取的情况。当判断线程是当前第一个获取读锁的线程,会进入<4>处的分支,将firstReader指向当前线程,firstReaderHoldCount赋值为1,代表当前线程获取一次读锁。如果原先读锁被获取的次数不为0,且当前获取读锁的线程为第一个获取读锁的线程,则代表读锁被重入,这里会进入<5>处的分支对firstReaderHoldCount+1。

如果<4>、<5>两个条件都不满足,原先读锁的获取次数不为0,且当前获取读锁的线程不是第一个获取读锁的线程,则会进入<6>处的分支,这里会先获取cachedHoldCounter指向的HoldCounter对象,cachedHoldCounter会指向最后一个获取读锁线程对应的HoldCounter对象(除了第一个获取读锁的线程外),HoldCounter对象用于存储不同线程获取读锁的次数。如果rh为null,或者rh的线程id不是当前当前的线程id,这里会进入<7>处,获取线程局部变量HoldCounter对象,并赋值给cachedHoldCounter。

之后判断rh指向的HoldCounter对象获取锁的次数是否为0,如果为0会调用ThreadLocal.set(T value)保存进线程的局部变量,大家思考下为什么这里要调用ThreadLocal.set(T value)保存局部变量呢?如果当前读锁已经有线程在共享,当有新的线程获取到读锁后会调用readHolds.get()方法执行ThreadLocalHoldCounter.initialValue()方法生成一个HoldCounter对象,虽然此时HoldCounter对象的获取读锁次数count为0,但此时这个对象也缓存在线程的局部变量中了,为什么这里还要调用ThreadLocal.set(T value)保存局部变量呢?这是因为在线程释放读锁的时候,如果判断rh.count为0,会将线程对应的HoldCounter对象从线程局部变量中移除。这里可能出现cachedHoldCounter指向最后一个获取读锁的线程的HoldCounter对象,线程释放读锁后将HoldCounter对象从局部变量中移除,但此时cachedHoldCounter依旧指向原先线程对应HoldCounter对象,并且线程在释放读锁后又重新获取读锁,且期间没有其他线程获取读锁,所以这里判断cachedHoldCounter指向的对象的线程id和当前线程id相同,就不会再调用readHolds.get()生成新的HoldCounter对象,而是复用旧的HoldCounter对象,如果HoldCounter为0,除了是新生成的,也有可能是上面所说的情况,这里会重新保存HoldCounter对象到线程的局部变量中。之后对HoldCounter对象的count字段+1表示获取一次读锁,最后返回1表示获取读锁成功。

上面的流程仅针对获取读锁较为顺利的情况,但在高并发场景下,很多事情都难以预料,比如在<3>处的readerShouldBlock() 方法返回线程应该阻塞,或者获取读锁的次数已经到达MAX_COUNT,又或者执行CAS的时候失败,有别的线程先当前线程获取了读锁或者写锁,当前读写锁的状态已经和原始的状态c不同了。只要出现这些情况都无法进入<3>处的分支。如果线程应该阻塞、读锁获取次数到达上限,又或者写锁被占有这些条件倒也罢了,如果仅仅是有线程先当前线程获取了读锁改变了读写锁状态state,导致<3>处的CAS失败从而无法获取读锁,则显得有些憋屈,因为读锁是允许共享的。因此,这里还会在执行<9>处的fullTryAcquireShared(current)避免出现CAS未命中的情况。

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
abstract boolean readerShouldBlock();
//...
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)//<1>
return -1;
int r = sharedCount(c);//<2>
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {//<3>
if (r == 0) {//<4>
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//<5>
firstReaderHoldCount++;
} else {//<6>
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))//<7>
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)//<8>
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);//<9>
}
//...
}
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public void lock() {
sync.acquireShared(1);
}
//...
}
//...
} public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
//...
}

    

fullTryAcquireShared(Thread current)是对尝试获取读锁的再一重保障,实现思路其实也与上面的tryAcquireShared(int unused)很类似,只是多了一个轮询直到判断获取读锁成功,或者读写锁状态不允许获取读锁。

这里依旧是在<1>处判断当前是否有线程占有写锁,如果写锁被占有且独占写锁的线程不是当前请求读锁的线程则退出。再判断写锁没有被占有的前提下,再判断读线程是否应该被阻塞,一般只有两种情况会进入到分支<2>,当前有线程正在共享读写锁的读锁,如果公平锁发现目前等待队列中有线程,这里会判断请求读锁的线程应该阻塞,如果是非公平锁,则判断等待队列中是否有线程,如果有的话等待时长最久的线程是否请求写锁,如果是的话则要阻塞当前请求读锁的线程,如果不是当前请求读锁的线程可以和其他线程一起共享读锁。所以这个分支是针对目前已经有线程共享读锁,且等待队列中有线程,又有新的线程来请求读锁做的判断。如果出现这种情况则进入到分支<2>处,这里会判断下请求线程对应的HoldCounter对象的获取读锁次数是否为0,正常情况应该为0,会把HoldCounter对象从局部变量中移除,之后判断获取读锁次数为0,则返回-1表示获取读锁失败。正常情况下第一个获取读锁的线程是不会进入到分支<2>,而除了第一个获取读锁的线程外,其他已经获取读锁的线程如果又重入读锁,是有会进入到分支<2>的,但这里会判断不是第一个线程,于是跳过分支<3>会进入分支<4>,又因为已经获取到读锁只是重入读锁的线程对应的获取读锁次数不为0,所以对应的HoldCounter对象不会被移除,也不会判断获取读锁次数为0而返回。

如果判断当前线程不应阻塞,或者当前线程应当阻塞后又发现当前线程早已获取到读锁,会继而执行到<5>处的代码,判断如果读锁被获取的次数已达上限则报错。如果未达上限,则会执行<6>处的CAS对读锁的获取次数+1(SHARED_UNIT),如果CAS成功则会增加当前线程对读锁的获取次数。如果<6>处的CAS失败,可能有别的线程先当前线程修改了读写锁的状态,这里会重新开始一轮新的循环,直到成功获取到读锁,或者判断有别的线程占有了写锁。

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {//<1>
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//<2>
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {//<3>
// assert firstReaderHoldCount > 0;
} else {//<4>
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)//<5>
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {//<6>
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
//...
}
//...
}

  

下面我们再来看看公平锁和非公平锁是如何判断是否应该阻塞当前请求读锁的线程, 首先可以看到公平锁的实现非常简单,就仅仅是判断队列中是否有等待线程,哪怕这些线程都是读线程,可以一起共享读锁,公平锁也会要求当前请求读锁的线程先入队等待,直到它前一个线程获取读锁后唤醒自己。而非公平锁则是先判断队列中头节点的后继节点是否为null,如果非空再判断是否是读锁,如果是写锁那当前请求读锁的线程只能先乖乖入队,如果当前线程和头节点的后继节点同为读线程,就判断不阻塞,当前线程可以尝试获取读锁。非公平锁相较公平锁,多了一种高并发下队列中的线程被无限期推迟的可能,如果头节点的后继节点是写线程倒也好说,读线程只能乖乖入队,不会延期写线程获取写锁,但如果后继节点为读线程,且不断有新的读线程成功获取读锁,那么后继节点的读线程将被延期,因为每次尝试用CAS修改读写锁的状态都会失败,这里的延期也包括后继节点之后的所有节点,不管是共享节点还是独占节点。

static final class FairSync extends Sync {
//...
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
} static final class NonfairSync extends Sync {
//...
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
} public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
//...
}

  

在了解读锁是如何加锁后,我们来看看如何释放读锁。当调用读锁(ReadLock)的unlock()方法时,会调用Sync的父类AQS实现的releaseShared(int arg)方法,在这个方法又会先调用子类实现的tryReleaseShared(arg)释放读锁,如果释放成功后,再调用doReleaseShared()尝试唤醒后继节点。下面我们就来了解下Sync类实现的tryReleaseShared(int unused),看看这个方法是如何释放读锁的。

在Sync的tryReleaseShared(int unused)方法中,会先判断当前释放读锁的线程是否是第一个获取读锁的线程,如果是的话则进入<1>处的分支,判断第一个线程获取读锁的次数,如果大于1,代表第一个线程在第一次获取读锁后又重入读锁,这里简单地对获取读锁的次数-1,如果判断释放的时候获取次数是1,则代表第一个线程将完全释放读锁,这里会置空firstReader的指向,但不会将firstReaderHoldCount清零,因为当所有读线程都完全释放读锁后,如果再有新的读线程请求读锁,会重新对firstReader、firstReaderHoldCount赋值。

如果释放读锁的线程不是第一个线程,会先获取cachedHoldCounter对象,即上一个获取读锁线程对应的HoldCounter对象,判断HoldCounter对象对应的线程是否是当前线程,如果是的话则不需要去线程局部变量查找与之对应的HoldCounter,如果不是则需要查找,在确定好线程对应的HoldCounter对象后,如果判断线程对锁的获取次数是1,则代表当前线程将完全释放读锁,这里会将HoldCounter对象从局部变量移除,再判断HoldCounter对象的获取次数是否为0,如果为0则代表当前线程没有先获取读锁就先释放读锁,这里会抛出unmatchedUnlockException异常。之后会对线程获取锁的次数-1。

最后会用CAS的方式对读写锁的读锁被获取数量-1(SHARED_UNIT),这里可能存在读线程并发释放读锁的情况,所以这里可能存在CAS失败的情况,如果这里失败则会一直轮询直到CAS成功,如果CAS成功则判断当前状态是否有线程占有读锁或者写锁,如果CAS成功后读写锁的状态为0,代表当前无读锁也无写锁,则会返回读锁被完全释放。

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {//<1>
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {//<2>
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {//<3>
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
//...
}
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public void unlock() {
sync.releaseShared(1);
}
//...
}
//...
} public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//...
}

  

读锁的tryReadLock()方法只以非公平的方式获取读写锁,不管其本身实现是公平锁还是非公平锁,这里会直接调用Sync实现的tryReadLock()方法,在这个方法中会先判断当前是否有线程占有写锁,如果有线程占有写锁,且当前请求读锁的线程和占有写锁的线程不是同一个,这里会返回获取读写锁失败。否则会判断当前读锁被获取次数是否已达上限MAX_COUNT,达到上限则报错。如果读锁被获取次数未达上限,才可以用CAS的方式对读锁的获取次数+1(SHARED_UNIT),如果CAS失败,代表当前有其他线程一起获取读锁,状态c已经被改变,会重新开始新的一轮尝试获取读锁流程。如果CAS成功则会增加线程对读锁的引用次数,之所以有这个HoldCounter对象,或者用firstReaderHoldCount字段统计第一个线程引用所的次数,主要是为了确保在线程执行释放读锁的时候,线程一定是之前获取过读锁的线程。如果读锁不能保证释放读锁的线程一定是之前获取过读锁的线程,则会出现线程A获取了读锁但尚未释放,此时线程B未获取读锁但直接释放读锁,读写锁状态回到0,可由读线程或者写线程获取,线程C获取写锁,那就出现了本章最开始讲的,一个线程在访问资源,另一个线程在修改资源,这是非常危险的。

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
abstract static class Sync extends AbstractQueuedSynchronizer {
//...
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
//...
}
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public boolean tryLock() {
return sync.tryReadLock();
}
}
//...
}

  

最后,我们简单看下读锁的tryLock(long timeout, TimeUnit unit)方法,这个方法会调用到AQS实现的tryAcquireSharedNanos(int arg, long nanosTimeout),相信大家看到这个方法的实现不会陌生,首先会调用子类实现的tryAcquireShared(arg)尝试获取读锁,如果获取失败,则会调用doAcquireSharedNanos(arg, nanosTimeout)尝试将当前请求读锁的线程挂起。这两个方法先前已经讲过,这里就不再赘述。

public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
//...
public static class ReadLock implements Lock, java.io.Serializable {
//...
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//...
}
//...
} public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
//...
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
//...
}

  

至此,读写锁的源码解析到此结束,这里很多AQS的方法之前笔者在ReentrantLockSemaphore的源码解析已经讲过,请在看此章之前一定一定一定要先去看或者两个章节的源码解析。如果有彻底理解这两个章节,大家就会知道其实不管是可重入互斥锁、信号量、可重入读写锁本身实现的业务并不多,它们的核心思想就是把线程视为一个个可入队的节点,只是这些节点有的会独占互斥锁或者写锁,有的可以和别的节点一起共享一个锁,通过用不同的角度看待AQS,可以实现适用于不同场景下的并发类。