并发编程学习笔记(6)----公平锁和ReentrantReadWriteLock使用及原理

时间:2023-03-09 08:50:46
并发编程学习笔记(6)----公平锁和ReentrantReadWriteLock使用及原理

(一)公平锁

  1、什么是公平锁?

  公平锁指的是在某个线程释放锁之后,等待的线程获取锁的策略是以请求获取锁的时间为标准的,即使先请求获取锁的线程先拿到锁。

  2、在java中的实现?

  在java的并发包中提供了ReentrantLock提供了重入锁并且也提供了公平锁(FairSync)和非公平锁(NonfairSync)。

  RenntranLock类的构造方法可传入一个boolean值作为标记是否是否用公平锁,默认是非公平的,非公平锁我们与我们之前学习时实现的可重入锁原理相似,这里就不再详说,接下来看看公平锁的源码:

 protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

FairSync重写了AQS中的tryAcquire()方法,跟我们前面的知识点中的需要什么要的锁就重写tryXX方法吻合。这里看代码。

可以看到这里跟我们前两两章学习到的非公平锁的实现只多了一个hasQueuePredecessors()方法的判断,这个方法就是判断当前线程的前一个线程是否也有资格去获取锁,只有没有资格获取锁时,当前的线程才可以返回true.否则就不能获取锁,接下来看看hasQueuePredecessors()方法:

public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

这段代码表示如果当链表为空时,是肯定可以获取的锁的,所以h!=t返回false,前面的方法会继续向下执行,链表不为空时,当头节点的下一个节点为空时,没有可以执行的线程,不能获取锁,当头节点的下一个节点不为空时,但是头节点的下一个节点的线程不等于当前线程,这样也是不能获取的,表示当前线程的前面还有先进来的线程在等待,所以不能获取锁,这样实现了一个公平锁。公平锁的使用只需要在实例化ReentrantLock的时候传入true即可。

(二)读写锁(ReentrantReadWriteLock)的使用及原理。

  1、什么是读写锁?

  在实际的应用中,读的操作是远远大于写操作的,并且读操作是不会产生线程安全问题的,如果我们给读和写的所有线程都加上互斥锁,那么在读的过程中会影响很大的性能,所以在java中提供了读写锁,读写锁分为读锁和写锁,其中读锁和读锁共享,读锁和写锁互斥,写锁和写锁互斥,这也是前面AQS的独享和共享模式的具体实现。

  2、读写锁的运用方式

  直接上代码:

package com.wangx.thread.t5;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; public class Demo { private Map<String,Object> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock write = readWriteLock.writeLock();
private Lock read = readWriteLock.readLock();
public Object get (String key) {
read.lock();
System.out.println(Thread.currentThread().getName() + "读操作在执行...");
try {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return map.get(key);
} finally {
read.unlock();
System.out.println(Thread.currentThread().getName() + "读操作执行完毕...");
}
} public void put(String key, Object value) {
write.lock();
System.out.println(Thread.currentThread().getName() + "写操作在执行...");
try {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
} finally {
write.unlock();
System.out.println(Thread.currentThread().getName() + "写操作执行完毕...");
} } }

由于hashMap是线程不安全的,但是又是需要读写都进行的,所以使用map的get和put方法可以很好的模拟线程安全问题和读写锁的使用,运用很简单,就是在线实例化ReentrantReadWriteLock,在通过该对象分别获取读写锁,在读的地方加上读锁,写的地方加上写锁。

  3、读写锁源码实现

  首先获取看写锁源码:

 public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
} public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }

ReentrantReadWriteLock的writeLock方法返回writeLock,在构造方法中又实例化了WriterLock,WriteLock是ReentrantReadWriteLock的内部类,实现了Lock接口,保证了锁的所有方法的功能。再看WriteLock中的lock方法,

 public void lock() {
sync.acquire(1);
}

ReentrantReadWriteLock的内部帮助器,所以这里调用的实际上是外部类的Sync内部类,接着进入该类的acquire()方法:

protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

继承AQS框架的实现都是通过操作state来进行锁的获取和释放,所以getState()方法就不说了,接下来看看exclusiveCount()方法:

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

这里是通过十六位来保存读锁和写锁的,高8位保存写锁,低八位保存读锁,exclusiveCount()传入1时返回值为本身。表示当前只能有一个线程持有独享锁。c等于0时,第一个线程第一次进来,操作和原理与之前的自己实现锁类似,就不详细说了,当c不等于0时,表示此时是有线程在执行的,但是w等于0则表示当前执行的是持有共享锁的县城,或者当前线程不是重入线程的情况下,返回false,获取线程失败,并且当写线程个数操过了最大数,也会获取锁失败,否则更改状态,返回true,获取重入锁成功。这就是写锁的实现,就是判断在写线程的情况下是否有其他线程在使用锁。

  写线程的unlock也是调用WriteLock的中的unlock,最后调用sync中的tryRelease方法,所以重点来看tryRelease方法:

protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

当线程释放时,先判断,如果不是独享线程,肯定是有异常的,直接抛出异常,这个释放方法只能是独享线程来调用,做重线程的减减操作,当持有锁的线程为0,切重入锁为0时,释放锁,更改状态。完成写锁的释放。

  读锁的实现源码,前面说过,读锁就是AQS的共享模式的具体运用,跟写锁类似,我们直接在ReentrantReadWriteLock的内部类的Sync中找到tryAcquireShared()方法:

protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}

第一步:判断是否有线程持有独享锁的存在,即exclusiveCount不为0,并且当前锁不是独享锁,返回-1表示获取锁失败。

第二步:获取持有共享锁的线程数量,readerShouldBlock()线程公平的判断,判断当前持有共享锁的线程是否小于最大线程数,是更改状态。

第三步:当r等于0时,表示当前线程是第一个,则将第一个线程保存,并将第一个线程的重入次数计为1。如果进来的线程是第一个的线程的重入,则重入次数加1。

第四步:HoldCounter用来保存持有持有共享锁的线程个数及线程id,当第二个能共享的线程进入时,rh==null时,readHolds获取到当前线程,readHolds通过ThreadLocal<HoldCounter> 保证线程安全。

第五步:当rh != null 时,且rh.tid != getThreadId(current),即缓存的线程不是当前线程,即是进来的一个新的持共享锁的线程,则也获取当前线程的信息存到cachedHoldCounter中。

第六步:以上条件不成立时,表示是一个重入线程进入,当他的重入次数为0,表示第一次进入,将其添加到readHolds中,并将count++记录某个线程重入的次数,这样就能保证存入的共享线程个数和每个线程重入的次数。所有操作成功,则放回1,表示获取锁成功,否则fullTryAcquireShared()对获取失败的各种原因进行处理,最后返回结果。

  tryReleaseShared()方法:

 protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

这里只分析跟独享锁不同的部分,如果当前线程是第一个进来的共享线程,且重入次数为1时,释放该线程的共享锁,否则重入次数减1,当前线程不等于缓存的线程时,获取readholds中的线程,得到获取到的线程冲入次数,当小于等于1时移除该线程,也就是释放该线程锁,小于等于0时抛出异常,不为1时则重入次数减减,当持有共享锁的所有线程都移除后,不断自旋,直到成功释放锁,返回状态nextc是否为0。表示该锁是否可以释放。这样就成功地释放了共享锁。

(三)这里有个降级锁,就是在读和写可能在同一个操作中是,我们需要将写锁降级为读锁,以保证线程安全,示例代码:

package com.wangx.thread.t6;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; public class Demo { private Map<String,Object> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock write = readWriteLock.writeLock();
private Lock read = readWriteLock.readLock();
private volatile boolean isUpdate; public void readWrite() {
read.lock();
if (isUpdate) {
read.unlock();
write.lock();
map.put("xxx", "xxx");
read.lock();
write.unlock();
}
Object object = map.get("xxx");
System.out.println(object);
read.unlock();
}
}

以上就是读写锁和公平锁我所了解的知识,今天的分享就到此为止,不足之处,忘各位指出。

原文 并发编程学习笔记(6)----公平锁和ReentrantReadWriteLock使用及原理