Callable接口和FutureTask实现类,是JUC(Java Util Concurrent)包中很重要的两个技术实现,它们使获取多线程运行结果成为可能。它们底层的实现,就是基于接口回调技术。接口回调,许多程序员都耳熟能详,这种技术被广泛应用于异步模块的开发中。它的实现原理并不复杂,但是对初学者来说却并不友好,其中的一个原因是它的使用场景和处理手段,对习惯了单线程开发的初学者来说有点绕。而各种文章或书籍,在解释这一个问题的时候,往往忽视了使用场景,而举一些小明坐车、A和B等等的例子,初学者看完之后往往更迷糊。
本文立足于此,就从多线程中线程结果获取这一需求场景出发,逐步说明接口回调及其在JUC中的应用。
需要了解Java多线程的底层运行机制,可以看这一篇:基于JVM原理、JMM模型和CPU缓存模型深入理解Java并发编程
线程结果获取
习惯了单线程开发的程序员,在异步编程中最难理解的一点,就是如何从线程运行结果返回信息,因为run和start方法本身是没有返回值的。一个基本的方法是,使用一个变量暂存运行结果,另外提供一个公共方法来返回这个变量。实现代码如下:
/*
* 设计可以返回运行结果的线程
* 定义一个线程读取文件内容, 使用字符串存取结果并返回主线程
*/
public class ReturnDigestTest extends Thread{
//定义文件名
private String fileName;
//定义一个字符串对象result, 用于存取线程执行结果
private String result; public ReturnDigestTest(String fileName) {
this.fileName = fileName;
}
//run方法中读取本目录下文件, 并存储至result
@Override
public void run() {
try (FileInputStream fis = new FileInputStream(fileName)){
byte[] buffer = new byte[];
int hasRead = ;
while ((hasRead = fis.read(buffer)) > ) {
result = new String(buffer, , hasRead);
}
} catch (IOException e) {
e.printStackTrace();
}
}
//定义返回result结果的方法
public String getResult() {
return result;
}
public static void main(String[] args) throws InterruptedException {
//测试, 在子线程中执行读取文件, 主线程返回
ReturnDigestTest returnDigestTest = new ReturnDigestTest("test.txt");
returnDigestTest.start();
//以下结果返回null. 因为getResult方法执行的时候, 子线程可能还没结束
System.out.println(returnDigestTest.getResult());
}
}
运行结果会输出一个null,原因在于读取文件的线程需要执行时间,所以很可能到主线程调用getResult方法的时候,子线程还没结束,结果就为null了。
如果在上面代码第35行,增加TimeUnit.SECONDS.sleep(5); 使主线程休眠5秒钟,你会发现结果正确返回。
竞态条件
在多线程环境下的实际开发场景中,更为常见的情形是,业务线程需要不断循环获取多个线程运行的返回结果。如果按照上述思路开发,那可能的结果为null,也可能导致程序挂起。上述方法是否成功,取决于竞态条件(Race Condition),包括线程数、CPU数量、CPU运算速度、磁盘读取速度、JVM线程调度算法。
轮询
作为对上述方法的一个优化,可以让主线程定期询问返回状态,直到结果非空在进行获取,这就是轮询的思路。沿用上面的例子,只需要把36行修改如下即可:
//使用轮询, 判断线程返回结果是否为null
while (true) {
if (returnDigestTest.getResult() != null) {
System.out.println(returnDigestTest.getResult());
break;
}
}
但是,这个方法仍然不具有普适性,在有些JVM,主线程会占用几乎所有运行时间,而导致子线程无法完成工作。
即便不考虑这个因素,这个方法仍然不理想,它使得CPU运行时间被额外占用了。就好像一个搭公交的小孩,每一站都在问:请问到站了吗?因此,比较理想的方法,是让子线程在它完成任务后,通知主线程,这就是回调方法。
接口回调的应用
在异步编程中,回调的意思是,一个线程在执行中或完毕后,通知另外一个线程,返回一些消息。而接口回调,则是充分利用了Java多态的特征,使用接口作为回调方法的引用。
使用接口回调技术来优化上面的问题,可以设计一个实现Runnable接口的类,一个回调方法的接口,以及一个回调方法接口的实现类(main方法所在类),具体实现如下
实现Runnable的类
/*
* 使用接口回调, 实现线程执行结果的返回
*/
public class CallbackDigest implements Runnable{
private String fileName;
private String result;
//定义回调方法接口的引用
private CallbackUserInterface cui;
public CallbackDigest(String fileName, CallbackUserInterface cui) {
this.fileName = fileName;
this.cui = cui;
}
@Override
public void run() {
try (FileInputStream fis = new FileInputStream(fileName)){
byte[] buffer = new byte[1024];
int hasRead = 0;
while((hasRead = fis.read(buffer)) > 0) {
result = new String(buffer, 0, hasRead);
}
//通过回调接口引用, 调用了receiveResult方法, 可以在主线程中返回结果.
//此处利用了多态
cui.receiveResult(result, fileName);
} catch (IOException e) {
e.printStackTrace();
}
}
}
回调方法接口
public interface CallbackUserInterface {
//只定义了回调方法, 传入一个待读取的文件名参数, 和返回结果
public void receiveResult(String result, String fileName);
}
回调方法接口实现类
public class CallbackTest implements CallbackUserInterface {
//实现回调方法
@Override
public void receiveResult(String result, String fileName) {
System.out.println("文件" + fileName + "的内容是: \n" + result);
} public static void main(String[] args) {
//新建回调接口引用, 指向实现类的对象
CallbackUserInterface test = new CallbackTest();
new Thread(new CallbackDigest("test.txt", test)).start();
}
}
接口回调的技术主要有4个关键点:
1. 发出信息的线程类:定义回调方法接口的引用,在构造方法中初始化。
2. 发出信息的线程类:使用回调方法接口的引用, 来调用回调方法。
3. 收取信息的线程类:实现回调接口,新建回调接口的引用,指向该类的对象。
4. 发出信息的线程类:新建线程类对象是,传入3中新建的实现类对象。
Callable和FutureTask的使用
Callable的底层实现类似于一个回调接口,而FutureTask类似于本例子中读取文件内容的线程实现类。因为FutureTask实现了Runnable接口,所以它的实现类是可以多线程的,而内部就是调用了Callable接口实现类的回调方法,从而实现线程结果的返回机制。demo代码如下:
public class TestCallable implements Callable<Integer>{
//实现Callable并重写call方法作为线程执行体, 并设置返回值1
@Override
public Integer call() throws Exception {
System.out.println("Thread is running...");
Thread.sleep(3000);
return 1;
} public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建Callable实现类的对象
TestCallable tc = new TestCallable();
//创建FutureTask类的对象
FutureTask<Integer> task = new FutureTask<>(tc);
//把FutureTask实现类对象作为target,通过Thread类对象启动线程
new Thread(task).start();
System.out.println("do something else...");
//通过get方法获取返回值
Integer integer = task.get();
System.out.println("The thread running result is :" + integer);
}
}