Java多线程之 Callable、Future和FutureTask

时间:2023-01-09 17:29:23


最近在研究Android AsyncTask 源码的时候,发现用到了很多JAVA SE 多线程的知识,于是回过头来查阅JDK 文档复习了一下Java多线程的相关知识,做一个笔记方便以后查找。


JDK API 中关于Callable介绍

Java多线程之 Callable、Future和FutureTask


Callable 接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是Runnable 不会返回结果,并且无法抛出经过检查的异常。 


我们可以通过 ExecutorService 的submit 方法来提交一个Runnable或者Callable任务,并且会返回一个Future对象。


Java多线程之 Callable、Future和FutureTask



Future 表示异步计算的结果,我们可以通过 Future 对象来获取计算的结果 或者是 取消正在进行的任务。


Java多线程之 Callable、Future和FutureTask


下面通过一个例子来比较Runnable和Callable的不同之处。


package com.example.test.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableTest {

/**
* @param args
*/
public static void main(String[] args) {

// testRunnable();
testCallable();

}

private static void testCallable() {

ExecutorService pool = Executors.newFixedThreadPool(3);

CalculateCallable task1 = new CalculateCallable(0);
CalculateCallable task2 = new CalculateCallable(1);
CalculateCallable task3 = new CalculateCallable(3);

try {
// 提交并执行任务,任务启动时返回了一个 Future对象,
// 如果想得到任务执行的结果或者是异常可对这个Future对象进行操作
Future<Integer> future1 = pool.submit(task1);
// 获得第一个任务的结果,如果调用get方法,当前线程会等待任务执行完毕后才往下执行
System.out.println("task1 get: " + future1.get());

Future<Integer> future2 = pool.submit(task2);
// System.out.println("task2 get: " + future2.get(2000,
// TimeUnit.MILLISECONDS));
// 等待5秒后,再停止第二个任务,因为第二个任务进行的是无限循环
Thread.sleep(8000);
System.out.println("task2 cancel: " + future2.cancel(true));

Future<Integer> future3 = pool.submit(task3);
System.out.println("task3 get: " + future3.get());

} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

// 停止任务执行服务
pool.shutdownNow();
}

private static void testRunnable() {

CalculateRunnable task1 = new CalculateRunnable(0);
CalculateRunnable task2 = new CalculateRunnable(1);
CalculateRunnable task3 = new CalculateRunnable(3);

ExecutorService pool = Executors.newFixedThreadPool(3);

try {
Future<?> future1 = pool.submit(task1);
System.out.println("task1 get: " + future1.get());

Future<?> future2 = pool.submit(task2);
// System.out.println("task2 get: " + future2.get());

// 等待5秒后,再停止第二个任务
Thread.sleep(8000);
System.out.println("task2 cancel: " + future2.cancel(true));

Future<?> future3 = pool.submit(task3);
System.out.println("task3 get: " + future3.get());

} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

// 停止任务执行服务
pool.shutdownNow();
}

public static class CalculateCallable implements Callable<Integer> {

private int param = -1;

public CalculateCallable(int param) {
this.param = param;
}

@Override
public Integer call() throws Exception {

if (this.param == 0) {

System.out.println("no loop********" + param);
return 0;
} else if (this.param == 1) {

// 如果flag的值为1,做一个无限循环
try {
while (true) {
System.out.println("while looping********" + param);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
return 1;
} else {
throw new Exception("illegal argument!"+param);
}
}
}

public static class CalculateRunnable implements Runnable {

private int param = -1;

public CalculateRunnable(int param) {
this.param = param;
}

@Override
public void run() {

if (this.param == 0) {
System.out.println("param=" + param + "***over");
} else if (this.param == 1) {

// 如果flag的值为1,做一个无限循环
try {
while (true) {
System.out.println("looping********" + param);
Thread.sleep(2000);
}
} catch (InterruptedException e) {
System.out.println("Interrupted");
}
System.out.println("param=" + param + "***over");
} else {
System.out.println("illegal argument!"+param);
}
}
}
}


FutureTask,可取消的异步计算。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法

Java多线程之 Callable、Future和FutureTask




package com.example.test.callable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskTest {

/**
* @param args
*/
public static void main(String[] args) {

testFutureTask();
}

private static void testFutureTask() {

Worker worker = new Worker(); // 工作线程

FutureTask<Integer> ft = new FutureTask<Integer>(worker);

new Thread(ft).start();

while (!ft.isDone()) {
try {
System.out.println("monitor worker thread...");

Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

try {
int amount = ft.get(); //返回值
System.out.println("amount="+amount);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}

public static class Worker implements Callable<Integer> {

private int flag = 100;
private int retVal = 0;

@Override
public Integer call() throws Exception {
while (flag > 0) {
System.out.println("I'm working......"+flag);

flag--;
retVal++;
Thread.sleep(1000);
}
return retVal;
}
}

}