【1】【JUC】Condition和生产者消费者模型

时间:2022-04-18 13:01:38

本篇文章将介绍Condition的实现原理和基本使用方法,基本过程如下:
1、Condition提供了await()方法将当前线程阻塞,并提供signal()方法支持另外一个线程将已经阻塞的线程唤醒。
2、Condition需要结合Lock使用
3、线程调用await()方法前必须获取锁,调用await()方法时,将线程构造成节点加入等待队列,同时释放锁,并挂起当前线程
4、其他线程调用signal()方法前也必须获取锁,当执行signal()方法时将等待队列的节点移入到同步队列,当线程退出临界区释放锁的时候(ReentrantLock.unlock),唤醒同步队列的首个节点

【1】【JUC】Condition和生产者消费者模型

【1】【JUC】Condition和生产者消费者模型

关于Condition接口

在并发编程中,每个Java对象都存在一组监视器方法,如wait()notify()以及notifyAll()方法,通过这些方法,我们可以实现线程间通信与协作(也称为等待唤醒机制),如生产者-消费者模式,而且这些方法必须配合着synchronized关键字使用(参看:java 为什么wait(),notify(),notifyAll()必须在同步(Synchronized)方法/代码块中调用?,关于这点,如果想有更深入的理解,可观看博主另外一篇博文【深入理解Java并发之synchronized实现原理】,与synchronized(+wait()、notify())的等待唤醒机制相比Condition(+Lock+await()、signal())具有更多的灵活性以及精确性,这是因为notify()在唤醒线程时是随机(同一个锁),而Condition则可通过多个Condition实例对象建立更加精细的线程控制(Lock,Condition ,await,signal),也就带来了更多灵活性了,我们可以简单理解为以下两点

  • 通过Condition能够精细的控制多线程的休眠与唤醒。

  • 对于一个锁,我们可以为多个线程间建立不同的Condition。

Condition是一个接口类,其主要方法如下:

public interface Condition {

 /**
* 使当前线程进入等待状态直到被通知(signal)或中断
* 当其他线程调用singal()或singalAll()方法时,该线程将被唤醒
* 当其他线程调用interrupt()方法中断当前线程
* await()相当于synchronized等待唤醒机制中的wait()方法
*/
void await() throws InterruptedException; //当前线程进入等待状态,直到被唤醒,该方法不响应中断要求
void awaitUninterruptibly(); //调用该方法,当前线程进入等待状态,直到被唤醒或被中断或超时
//其中nanosTimeout指的等待超时时间,单位纳秒
long awaitNanos(long nanosTimeout) throws InterruptedException; //同awaitNanos,但可以指明时间单位
boolean await(long time, TimeUnit unit) throws InterruptedException; //调用该方法当前线程进入等待状态,直到被唤醒、中断或到达某个时
//间期限(deadline),如果没到指定时间就被唤醒,返回true,其他情况返回false
boolean awaitUntil(Date deadline) throws InterruptedException; //唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须
//获取与Condition相关联的锁,功能与notify()相同
void signal(); //唤醒所有等待在Condition上的线程,该线程从等待方法返回前必须
//获取与Condition相关联的锁,功能与notifyAll()相同
void signalAll();
}

关于Condition的实现类是AQS的内部类ConditionObject,关于这点我们稍后分析,这里先来看一个Condition的使用案例,即经典消费者生产者模式

Condition的使用案例-生产者消费者模式

这里我们通过一个卖烤鸭的案例来演示多生产多消费者的案例,该场景中存在两条生产线程t1和t2,用于生产烤鸭,也存在两条消费线程t3,t4用于消费烤鸭,4条线程同时执行,需要保证只有在生产线程产生烤鸭后,消费线程才能消费,否则只能等待,直到生产线程产生烤鸭后唤醒消费线程,注意烤鸭不能重复消费。ResourceByCondition类中定义product()和consume()两个方法,分别用于生产烤鸭和消费烤鸭,并且定义ReentrantLock锁,用于控制product()和consume()的并发,由于必须在烤鸭生成完成后消费线程才能消费烤鸭,否则只能等待,因此这里定义两组Condition对象,分别是producer_con和consumer_con,前者拥有控制生产线程,后者拥有控制消费线程,这里我们使用一个标志flag来控制是否有烤鸭,当flag为true时,代表烤鸭生成完毕,生产线程必须进入等待状态同时唤醒消费线程进行消费,消费线程消费完毕后将flag设置为false,代表烤鸭消费完成,进入等待状态,同时唤醒生产线程生产烤鸭,具体代码如下

package com.zejian.concurrencys;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; //公共资源类,线程获取这个类调用它的生产和消费方法
public class ResourceByCondition {
private String name; //商品名称
private int count = 1; //商品编号
private boolean flag = false; //是否有烤鸭 //创建一个锁对象。
Lock lock = new ReentrantLock(); //通过已有的锁获取两组监视器,一组监视生产者,一组监视消费者。
Condition producer_con = lock.newCondition();
Condition consumer_con = lock.newCondition(); /**
* 生产
* @param name
*/
public void product(String name)
{
lock.lock(); //上锁,避免一个商品生产了两次或者被消费了两次(避免两个线程count++结果只加了1的情况) try
{
while(flag){ //有烤鸭
try{producer_con.await();}catch(InterruptedException e){} //有烤鸭,则加入producer_con对应的等待队列
}
this.name = name + count;
count++;
System.out.println(Thread.currentThread().getName()+"...生产者5.0..."+this.name);
flag = true; //使消费者线程可以跳过while(!flag)运行下去
consumer_con.signal();//直接唤醒消费线程
}
finally
{
lock.unlock(); //解锁,以便让下一个线程可以执行。
}
} /**
* 消费
*/
public void consume()
{
lock.lock();
try
{
while(!flag){ //没有烤鸭
try{consumer_con.await();}catch(InterruptedException e){} //没有烤鸭,则加入consumer_con对应的等待队列
}
System.out.println(Thread.currentThread().getName()+"...消费者.5.0......."+this.name);//消费烤鸭1
flag = false;
producer_con.signal();//直接唤醒生产线程
}
finally
{
lock.unlock();
}
}
}

执行代码

package com.zejian.concurrencys;

public class Mutil_Producer_ConsumerByCondition {

    public static void main(String[] args) {
ResourceByCondition r = new ResourceByCondition();
Mutil_Producer pro = new Mutil_Producer(r);
Mutil_Consumer con = new Mutil_Consumer(r);
//生产者线程
Thread t0 = new Thread(pro);
Thread t1 = new Thread(pro);
//消费者线程
Thread t2 = new Thread(con);
Thread t3 = new Thread(con);
//启动线程
t0.start();
t1.start();
t2.start();
t3.start();
}
} /**
* @decrition 生产者线程
*/
class Mutil_Producer implements Runnable {
private ResourceByCondition r; Mutil_Producer(ResourceByCondition r) { //传入Resource资源对象
this.r = r;
} public void run() {
while (true) { //while(true)记住格式,无线循环
r.product("北京烤鸭");
}
}
} /**
* @decrition 消费者线程
*/
class Mutil_Consumer implements Runnable {
private ResourceByCondition r; Mutil_Consumer(ResourceByCondition r) {
this.r = r;
} public void run() {
while (true) { // while(true)记住格式,无线循环
r.consume();
}
}
}

正如代码所示,我们通过两者Condition对象单独控制消费线程与生产消费,这样可以避免消费线程在唤醒线程时唤醒的还是消费线程,如果是通过synchronized的等待唤醒机制实现的话,就可能无法避免这种情况,毕竟同一个锁,对于synchronized关键字来说只能有一组等待唤醒队列,而不能像Condition一样,同一个锁拥有多个等待队列。synchronized的实现方案如下,

public class KaoYaResource {

    private String name;
private int count = 1;//烤鸭的初始数量
private boolean flag = false;//判断是否有需要线程等待的标志
/**
* 生产烤鸭
*/
public synchronized void product(String name){
while(flag){
//此时有烤鸭,等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.name=name+count;//设置烤鸭的名称
count++;
System.out.println(Thread.currentThread().getName()+"...生产者..."+this.name);
flag=true;//有烤鸭后改变标志
notifyAll();//通知消费线程可以消费了
} /**
* 消费烤鸭
*/
public synchronized void consume(){
while(!flag){//如果没有烤鸭就等待
try{this.wait();}catch(InterruptedException e){}
}
System.out.println(Thread.currentThread().getName()+"...消费者........"+this.name);//消费烤鸭1
flag = false;
notifyAll();//通知生产者生产烤鸭
}
}

为什么要使用synchronized,多个线程访问修改KaoYaResource中的count,是线程不安全的

如上代码,在调用notify()或者 notifyAll()方法时,由于等待队列中同时存在生产者线程和消费者线程,所以我们并不能保证被唤醒的到底是消费者线程还是生产者线程,而Codition则可以避免这种情况。嗯,了解完Condition的使用方式后,下面我们将进一步探讨Condition背后的实现机制

生产者消费者模型的三种实现方式:

1.Synchronized,wait,notifyAll

2.Lock,Condition ,await,signal

3.BlockingQueue

可参看生产者-消费者模型的三种实现方式:https://www.cnblogs.com/twoheads/p/10137263.html

package producerConsumer;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; //使用阻塞队列BlockingQueue解决生产者消费者
public class BlockingQueueConsumerProducer {
public static void main(String[] args) {
Resource3 resource = new Resource3();
//生产者线程
ProducerThread3 p = new ProducerThread3(resource);
//多个消费者
ConsumerThread3 c1 = new ConsumerThread3(resource);
ConsumerThread3 c2 = new ConsumerThread3(resource);
ConsumerThread3 c3 = new ConsumerThread3(resource); p.start();
c1.start();
c2.start();
c3.start();
}
}
/**
* 消费者线程
* @author tangzhijing
*
*/
class ConsumerThread3 extends Thread {
private Resource3 resource3; public ConsumerThread3(Resource3 resource) {
this.resource3 = resource;
//setName("消费者");
} public void run() {
while (true) {
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource3.remove();
}
}
}
/**
* 生产者线程
* @author tangzhijing
*
*/
class ProducerThread3 extends Thread{
private Resource3 resource3;
public ProducerThread3(Resource3 resource) {
this.resource3 = resource;
//setName("生产者");
} public void run() {
while (true) {
try {
Thread.sleep((long) (1000 * Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
resource3.add();
}
}
}
class Resource3{
private BlockingQueue resourceQueue = new LinkedBlockingQueue(10);
/**
* 向资源池中添加资源
*/
public void add(){
try {
resourceQueue.put(1);
System.out.println("生产者" + Thread.currentThread().getName()
+ "生产一件资源," + "当前资源池有" + resourceQueue.size() +
"个资源");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 向资源池中移除资源
*/
public void remove(){
try {
resourceQueue.take();
System.out.println("消费者" + Thread.currentThread().getName() +
"消耗一件资源," + "当前资源池有" + resourceQueue.size()
+ "个资源");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

Condition的实现原理

Condition的具体实现类是AQS的内部类ConditionObject,前面我们分析过AQS中存在两种队列,一种是同步队列,一种是等待队列,而等待队列就相对于Condition而言的。注意在使用Condition前必须获得锁,同时在Condition的等待队列上的结点与前面同步队列的结点是同一个类即Node,其结点的waitStatus的值为CONDITION。在实现类ConditionObject中有两个结点分别是firstWaiter和lastWaiter,firstWaiter代表等待队列第一个等待结点,lastWaiter代表等待队列最后一个等待结点,如下

public class ConditionObject implements Condition, java.io.Serializable {
//等待队列第一个等待结点
private transient Node firstWaiter;
//等待队列最后一个等待结点
private transient Node lastWaiter;
//省略其他代码.......
}

每个Condition都对应着一个等待队列!!,也就是说如果一个锁上创建了多个Condition对象,那么也就存在多个等待队列。等待队列是一个FIFO的队列,在队列中每一个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。Condition中的等待队列模型如下

【1】【JUC】Condition和生产者消费者模型

正如图所示,Node节点的数据结构,在等待队列中使用的变量与同步队列是不同的,Condtion中等待队列的结点只有直接指向的后继结点并没有指明前驱结点,而且使用的变量是nextWaiter而不是next,这点我们在前面分析结点Node的数据结构时讲过。firstWaiter指向等待队列的头结点,lastWaiter指向等待队列的尾结点,等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。再次强调每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。下面从代码层面看看被调用await()方法(其他await()实现原理类似)的线程是如何加入等待队列的,而又是如何从等待队列中被唤醒的

public final void await() throws InterruptedException {
//判断线程是否被中断
if (Thread.interrupted())
throw new InterruptedException();
//创建新结点加入等待队列并返回
Node node = addConditionWaiter();
//释放当前线程锁即释放同步状态
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断结点是否同步队列(SyncQueue)中,即是否被唤醒
while (!isOnSyncQueue(node)) {
//挂起线程
LockSupport.park(this);
//判断是否被中断唤醒,如果是退出循环。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// clean up if cancelled
if (node.nextWaiter != null)
//清理等待队列中不为CONDITION状态的结点
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

执行addConditionWaiter()添加到等待队列。

private Node addConditionWaiter() {
Node t = lastWaiter;
// 判断是否为结束状态的结点并移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建新结点状态为CONDITION
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//加入等待队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

V

await()方法主要做了3件事,一是调用addConditionWaiter()方法将当前线程封装成node结点加入等待队列,二是调用fullyRelease(node)方法释放同步状态并唤醒后继结点的线程。三是调用isOnSyncQueue(node)方法判断结点是否在同步队列中(即await后一直卡在这里,while循环判断是否已经被其他线程signal唤醒),如果同步队列中没有该结点就直接挂起该线程,需要明白的是如果线程被唤醒后就调用acquireQueued(node, savedState)执行自旋操作争取锁,即当前线程结点从等待队列转移到同步队列并开始努力获取锁。

接着看看唤醒操作singal()方法

public final void signal() {
//判断是否持有独占锁,如果不是抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//唤醒等待队列第一个结点的线程
if (first != null)
doSignal(first);
}

这里signal()方法做了两件事,一是判断当前线程是否持有独占锁,没有就抛出异常,从这点也可以看出只有独占模式先采用等待队列,而共享模式下是没有等待队列的,也就没法使用Condition。二是唤醒等待队列的第一个结点,即执行doSignal(first)

private void doSignal(Node first) {
do {
//移除条件等待队列中的第一个结点,
//如果后继结点为null,那么说没有其他结点将尾结点也设置为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//如果被通知节点没有进入到同步队列并且条件等待队列还有不为空的节点,则继续循环通知后续结点
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
} //transferForSignal方法
final boolean transferForSignal(Node node) {
//尝试设置唤醒结点的waitStatus为0,即初始化状态
//如果设置失败,说明当期结点node的waitStatus已不为
//CONDITION状态,那么只能是结束状态了,因此返回false
//返回doSignal()方法中继续唤醒其他结点的线程,注意这里并
//不涉及并发问题,所以CAS操作失败只可能是预期值不为CONDITION,
//而不是多线程设置导致预期值变化,毕竟操作该方法的线程是持有锁的。
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false; //加入同步队列并返回前驱结点p
Node p = enq(node);
int ws = p.waitStatus;
//判断前驱结点是否为结束结点(CANCELLED=1)或者在设置
//前驱节点状态为Node.SIGNAL状态失败时,唤醒被通知节点代表的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒node结点的线程
LockSupport.unpark(node.thread);
return true;
}

注释说得很明白了,这里我们简单整体说明一下,doSignal(first)方法中做了两件事,从条件等待队列移除被唤醒的节点,然后重新维护条件等待队列的firstWaiter和lastWaiter的指向。二是将从等待队列移除的结点加入同步队列(在transferForSignal()方法中完成的),如果进入到同步队列失败并且条件等待队列还有不为空的节点,则继续循环唤醒后续其他结点的线程。到此整个signal()的唤醒过程就很清晰了,即signal()被调用后,先判断当前线程是否持有独占锁,如果有,那么唤醒当前Condition对象中等待队列的第一个结点的线程,并从等待队列中移除该结点,移动到同步队列中,如果加入同步队列失败,那么继续循环唤醒等待队列中的其他结点的线程,如果成功加入同步队列,那么如果其前驱结点是否已结束或者设置前驱节点状态为Node.SIGNAL状态失败,则通过LockSupport.unpark()唤醒被通知节点代表的线程,到此signal()任务完成,注意被唤醒后的线程,将从前面的await()方法中的while循环中退出,因为此时该线程的结点已在同步队列中,那么while (!isOnSyncQueue(node))将不在符合循环条件,进而调用AQS的acquireQueued()方法加入获取同步状态的竞争中,这就是等待唤醒机制的整个流程实现原理,流程如下图所示(注意无论是同步队列还是等待队列使用的Node数据结构都是同一个,不过是使用的内部变量不同罢了)

【1】【JUC】Condition和生产者消费者模型

转自:https://blog.csdn.net/javazejian/article/details/75043422