【深入分析Java多线程】(6)线程间通信之wait()、notify()、join()

时间:2020-12-10 17:21:17

Wait-Notify场景

典型的Wait-Notify场景一般与以下两个内容相关:
1. 状态变量(State Variable)
当线程需要wait的时候,总是因为一些条件得不到满足导致的。例如往队列里填充数据,当队列元素已经满时,线程就需要wait停止运行。当队列元素有空缺时,再继续自己的执行。
2. 条件断言(Condition Predicate)
当线程确定是否进入wait或者是从notify醒来的时候是否继续往下执行,大部分都要测试状态条件是否满足。例如,往队列里添加元素,队列已满,于是阻塞当前线程,当有其他线程从队列里取走了元素,就通知在等待的线程“队列有剩余空间,可以往里添加元素了”。这时,等待添加元素的进程就会被唤醒,然后判断一下当前队列是否真的有剩余空间,如果真的有剩余空间,就将元素添加进去,如果没有,则继续阻塞等待下次唤醒。
3. 条件队列(Condition Queue)
每个对象都有一个内置的条件队列,当一个线程在该对象锁上调用wait函数的时候,就会将该线程加入到该对象的条件队列中。

注意
wait与notify是Java同步机制中的重要组成部分。结合与synchronized关键字使用,可以建立很多优秀的同步模型,例如生产者-消费者模型。但是在使用wait()、notify()、notifyAll()函数的时候,需要特别注意以下几点:

    1、wait()、notify()、notifyAll()方法不属于Thread类,而是属于Object基础类,也就是说每个对象都有wait()、notify()、notifyAll()的功能。因为每个对象都有锁,锁是每个对象的基础,因此操作锁的方法也是最基础的。
   2、 调用obj的wait(), notify()方法前,必须获得obj锁,也就是必须写在synchronized(obj){...} 代码段内。
   3、 调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj){...} 代码段内唤醒线程A。
   4、 当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。
   5、 如果线程A1,A2,A3都在obj.wait(),则线程B调用obj.notify()只能唤醒线程A1,A2,A3中的一个(具体哪一个由JVM决定)。
    6、如果线程B调用obj.notifyAll()则能全部唤醒等待的线程A1,A2,A3,但是等待的线程要继续执行obj.wait()的下一条语句,必须获得obj锁。因此,线程A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。
    7、当线程B调用obj.notify()或者obj.notifyAll()的时候,线程B正持有obj锁,因此,线程A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到线程B退出synchronized代码块,释放obj锁后,线程A1,A2,A3中的一个才有机会获得对象锁并得以继续执行。


示例代码
线程的wait操作的典型代码结构如下:

public void test() throws InterruptedException { 
synchronized(obj) {
while (! contidition) {
obj.wait();
}
}
}

为什么obj.wait()操作必须位于循环中呢?有以下几个主要原因:
1. 一个对象锁可能用于保护多个状态变量,当它们都需要wait-notify操作时,如果不将wait放到while循环中就会有问题。例如,某对象锁obj保护两种状态变量a和b,当a的条件断言不成立时发生了wait操作,当b的条件断言不成立时也发生了wait操作,两个线程被加入到obj对应的条件队列中。现在若改变状态变量a的某操作发生,在obj上调用了notifyAll操作,则obj对应的条件队列里的所有线程均被唤醒,之前等待a的一个或几个线程去判断a的条件断言可能成立了,但是b对于的条件断言肯定仍不成立,而此时等待b的线程也被唤醒了,所以需要循环判断b的条件断言是否满足,如果不满足,则继续wait。
2. 多个线程wait的同一状态的条件断言。例如,向队列添加元素的场景,当前队列是满的,多个线程想往里面添加元素,于是都wait了。此时,另一个线程从队列里取出了一个元素,调用了notifyAll操作,唤醒了所有线程,但是只有一个线程能够往队列里添加一个元素,其他的仍需要等待。
3. 虚假唤醒。在没有被通知、中断或超时的情况下,线程自动苏醒了。虽然这种情况在实践中很少发生,但是通过循环等待可以杜绝这一情况的发生。


join()

1、使用方式。

join是Thread类的一个方法,启动线程后直接调用,例如:

Thread t = new AThread(); t.start(); t.join();

2、为什么要用join()方法

在很多情况下,主线程生成并起动了子线程,如果子线程里要进行大量的耗时的运算,主线程往往将于子线程之前结束,但是如果主线程处理完其他的事务后,需要用到子线程的处理结果,也就是主线程需要等待子线程执行完成之后再结束,这个时候就要用到join()方法了。

3、join方法的作用

在JDk的API里对于join()方法是:

join

public final void join() throws InterruptedException Waits for this thread to die. Throws: InterruptedException  - if any thread has interrupted the current thread. The interrupted status of the current thread is cleared when this exception is thrown.

即join()的作用是:“等待该线程终止”,这里需要理解的就是该线程是指的主线程等待子线程的终止。也就是在子线程调用了join()方法后面的代码,只有等到子线程结束了才能执行。

4、用实例来理解

写一个简单的例子来看一下join()的用法:

class BThread extends Thread {
public BThread() {
super("[BThread] Thread");
};
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start.");
try {
for (int i = 0; i < 5; i++) {
System.out.println(threadName + " loop at " + i);
Thread.sleep(1000);
}
System.out.println(threadName + " end.");
} catch (Exception e) {
System.out.println("Exception from " + threadName + ".run");
}
}
}
class AThread extends Thread {
BThread bt;
public AThread(BThread bt) {
super("[AThread] Thread");
this.bt = bt;
}
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start.");
try {
bt.join();
System.out.println(threadName + " end.");
} catch (Exception e) {
System.out.println("Exception from " + threadName + ".run");
}
}
}
public class TestDemo {
public static void main(String[] args) {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start.");
BThread bt = new BThread();
AThread at = new AThread(bt);
try {
bt.start();
Thread.sleep(2000);
at.start();
at.join();
} catch (Exception e) {
System.out.println("Exception from main");
}
System.out.println(threadName + " end!");
}
}
打印结果:

main start. //主线程起动,因为调用了at.join(),要等到at结束了,此线程才能向下执行。
[BThread] Thread start.
[BThread] Thread loop at 0
[BThread] Thread loop at 1
[AThread] Thread start. //线程at启动,因为调用bt.join(),等到bt结束了才向下执行。
[BThread] Thread loop at 2
[BThread] Thread loop at 3
[BThread] Thread loop at 4
[BThread] Thread end.
[AThread] Thread end. // 线程AThread在bt.join();阻塞处起动,向下继续执行的结果
main end! //线程AThread结束,此线程在at.join();阻塞处起动,向下继续执行的结果。
修改一下代码:
public class TestDemo {
public static void main(String[] args) {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " start.");
BThread bt = new BThread();
AThread at = new AThread(bt);
try {
bt.start();
Thread.sleep(2000);
at.start();
//at.join(); //在此处注释掉对join()的调用
} catch (Exception e) {
System.out.println("Exception from main");
}
System.out.println(threadName + " end!");
}
}
打印结果:
main start. // 主线程起动,因为Thread.sleep(2000),主线程没有马上结束;
[BThread] Thread start. //线程BThread起动
[BThread] Thread loop at 0
[BThread] Thread loop at 1
main end! // 在sleep两秒后主线程结束,AThread执行的bt.join();并不会影响到主线程。
[AThread] Thread start. //线程at起动,因为调用了bt.join(),等到bt结束了,此线程才向下执行。
[BThread] Thread loop at 2
[BThread] Thread loop at 3
[BThread] Thread loop at 4
[BThread] Thread end. //线程BThread结束了
[AThread] Thread end. // 线程AThread在bt.join();阻塞处起动,向下继续执行的结果


5、从源码看join()方法

在AThread的run方法里,执行了bt.join();,进入看一下它的JDK源码:

public final void join() throws InterruptedException {
join(0L);
}

然后进入join(0L)方法:

public final synchronized void join(long l)
throws InterruptedException
{
long l1 = System.currentTimeMillis();
long l2 = 0L;
if(l < 0L)
throw new IllegalArgumentException("timeout value is negative");
if(l == 0L)
for(; isAlive(); wait(0L));
else
do
{
if(!isAlive())
break;
long l3 = l - l2;
if(l3 <= 0L)
break;
wait(l3);
l2 = System.currentTimeMillis() - l1;
} while(true);
}

单纯从代码上看: * 如果线程被生成了,但还未被起动,isAlive()将返回false,调用它的join()方法是没有作用的。将直接继续向下执行。 * 在AThread类中的run方法中,bt.join()是判断bt的active状态,如果bt的isActive()方法返回false,在bt.join(),这一点就不用阻塞了,可以继续向下进行了。从源码里看,wait方法中有参数,也就是不用唤醒谁,只是不再执行wait,向下继续执行而已。 * 在join()方法中,对于isAlive()和wait()方法的作用对象是个比较让人困惑的问题:

isAlive()方法的签名是:public final native boolean isAlive(),也就是说isAlive()是判断当前线程的状态,也就是bt的状态。

wait()方法在jdk文档中的解释如下:

Causes the current thread to wait until another thread invokes the notify() method or the notifyAll() method for this object. In other words, this method behaves exactly as if it simply performs the call wait(0).

The current thread must own this object's monitor. The thread releases ownership of this monitor and waits until another thread notifies threads waiting on this object's monitor to wake up either through a call to the notify method or the notifyAll method. The thread then waits until it can re-obtain ownership of the monitor and resumes execution.

在这里,当前线程指的是at。

注意:

join(long)与sleep(long)的区别:

join(long)具有释放锁的特点,sleep(long)不释放锁。


实例--两个线程交替打印

package twoT;

public class TwoThread {

static boolean b = true;
static int i = 0;

public static void main(String[] args) {
Object lock1 = new Object();
T3 t3 = new T3(lock1);
T4 t4 = new T4(lock1);
t3.start();
t4.start();
}


public static void print(String name) {
System.out.println(name + ":" + i++);
}

}

class T3 extends Thread {
Object lock;
public T3(Object lock) {
this.lock = lock;
}

@Override
public void run() {
while (true) {
synchronized (lock) {
if (!TwoThread.b) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
TwoThread.print("A");
if (TwoThread.i >= 100) {
break;
}
TwoThread.b = false;
lock.notify();
}
}
}
}


class T4 extends Thread {
Object lock;
public T4(Object lock) {
this.lock = lock;
}

@Override
public void run() {
while (true) {
synchronized (lock) {
if (TwoThread.b) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
TwoThread.print("B");
if (TwoThread.i >= 100) {
break;
}
TwoThread.b = true;
lock.notify();
}
}
}
}