Java之Object对象中的wait()和notifyAll()用法

时间:2023-03-10 03:39:08
Java之Object对象中的wait()和notifyAll()用法

用一个例子来说明Object对象中的wait方法和notifyAll方法的使用。

首先定义一个消息类,用于封装数据,以供读写线程进行操作:

 /**
* 消息
*
* @author syj
*/
public class Message { private String msg; public String getMsg() {
return msg;
} public void setMsg(String msg) {
this.msg = msg;
}
}

创建一个读线程,从Message对象中读取数据,如果没有数据,就使用 wait() 方法一直阻塞等待结果(等待后面的写线程写入数据):

 /**
* 读线程
*
* @author syj
*/
public class Reader implements Runnable { private Message message; public Reader(Message message) {
this.message = message;
} @Override
public void run() {
synchronized (message) {
try {
// 务必加上该判断,否则可能会因某个读线程在写线程的 notifyAll() 之后执行,
// 这将导致该读线程永远无法被唤醒,程序会一直被阻塞
if (message.getMsg() == null) {
message.wait();// 等待被 message.notify() 或 message.notifyAll() 唤醒
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// 读取 message 对象中的数据
System.out.println(Thread.currentThread().getName() + " - " + message.getMsg());
}
}
}

创建一个写线程,往Message对象中写数据,写入成功就调用 message.notifyAll() 方法来唤醒在 message.wait() 上阻塞的线程(上面的读线程将被唤醒,读线程解除阻塞继续执行):

 import java.util.UUID;

 /**
* 写线程
*
* @author syj
*/
public class Writer implements Runnable { private Message message; public Writer(Message message) {
this.message = message;
} @Override
public void run() {
synchronized (message) {
try {
Thread.sleep(1000L);// 模拟业务耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
// 向 message 对象中写数据
message.setMsg(Thread.currentThread().getName() + ":" + UUID.randomUUID().toString().replace("-", ""));
message.notifyAll();// 唤醒所有 message.wait()
}
}
}

注意,读线程的等待和写线程的唤醒,必须调用同一个对象上的wait或notifyAll方法,并且对这两个方法的调用一定要放在synchronized块中。

这里的读线程和写线程使用的同一个对象是message,读线程调用message.wait()方法进行阻塞,写线程调用message.notifyAll()方法唤醒所有(因为调用message.wait()方法的可能会有对个线程,在本例中就有两个读线程调用了message.wait() 方法)读线程的阻塞。

写一个测试类,启动两个读线程,从Message对象中读取数据,再启动一个写线程,往Message对象中写数据:

 /**
* 测试 Object 对象中的 wait()/notifyAll() 用法
*
* @author syj
*/
public class LockApp {
public static void main(String[] args) {
Message message = new Message();
new Thread(new Reader(message), "R1").start();// 读线程 名称 R1
new Thread(new Reader(message), "R2").start();// 读线程 名称 R2
new Thread(new Writer(message), "W").start();// 写线程 名称 W
}
}

控制台打印结果:

R2 - W:4840dbd6b312489a9734414dd99a4bcb
R1 - W:4840dbd6b312489a9734414dd99a4bcb

其中R2代表第二个读线程,R2是这个读线程的名字。R1是第一个读线程,线程名叫R2。后面的uui就是模拟的异步执行结果了,W代表写线程的名字,表示数据是由写线程写入的。 由于我们只开启一个写线程,所有两条数据的uuid是同一个,只不过被两个读线程都接收到了而已。

抛出一个问题:Object对象的这个特性有什么用呢?

它比较适合用在同步等待异步处理结果的场景中。比如,在RPC框架中,Netty服务器通常返回结果是异步的,而Netty客户端想要拿到这个异步结果进行处理,该怎么做呢?

下面使用伪代码来模拟这个场景:

 import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; /**
* 使用 Object对象的 wait() 和 notifyAll() 实现同步等待异步结果
*
* @author syj
*/
public class App { // 用于存放异步结果, key是请求ID, value是异步结果
private static ConcurrentHashMap<String, String> resultMap = new ConcurrentHashMap<>();
private Object lock = new Object(); /**
* 写数据到 resultMap,写入成功唤醒所有在 lock 对象上等待的线程
*
* @param requestId
* @param message
*/
public void set(String requestId, String message) {
resultMap.put(requestId, message);
synchronized (lock) {
lock.notifyAll();
}
} /**
* 从 resultMap 中读数据,如果没有数据则等待
*
* @param requestId
* @return
*/
public String get(String requestId) {
synchronized (lock) {
try {
if (resultMap.get(requestId) == null) {
lock.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return resultMap.get(requestId);
} /**
* 移除结果
*
* @param requestId
*/
public void remove(String requestId) {
resultMap.remove(requestId);
} /**
* 测试方法
*
* @param args
*/
public static void main(String[] args) {
// 请求唯一标识
String requestId = UUID.randomUUID().toString();
App app = new App();
try {
// 模拟Netty服务端异步返回结果
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000L);// 模拟业务耗时
} catch (InterruptedException e) {
e.printStackTrace();
}
// 写入数据
app.set(requestId, UUID.randomUUID().toString().replace("-", ""));
}
}).start(); // 模拟Netty客户端同步等待读取Netty服务器端返回的结果
String message = app.get(requestId);
System.out.println(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
// 结果不再使用,一定要移除,以防止内容溢出
app.remove(requestId);
}
}
}

这里定义了一个静态的ConcurrentHashMap容器,来存放Netty服务器返回的异步结果,key是请求的id,value就是异步执行结果。

调用set方法可以往容器中写入数据(写入请求ID和相对应的执行结果),调用get方法可以从容器读取数据(根据请求ID获取对应的执行结果)。

get方法中调用lock对象的wait方法进行阻塞等待结果,set方法往容器中写入结果之后,紧接着调用的是同一个lock对象的notifyAll方法来唤醒该lock对象上的所有wait()阻塞线程。

以此来达到同步等待获取异步执行结果的目的。

参考文章:https://cloud.tencent.com/developer/article/1155102