一、概述
FutureTask包装器是一种非常便利的机制,同时实现了Future和Runnable接口。
类图如下:
FutureTask是一种可以取消的异步的计算任务。它的计算是通过Callable实现的,它等价于可以携带结果的Runnable,并且有三个状态:等待、运行和完成。完成包括所有计算以任意的方式结束,包括正常结束、取消和异常。
Future有个get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。
FutureTask有下面几个重要的方法:
1.get()
阻塞一直等待执行完成拿到结果
2.get(int timeout, TimeUnit timeUnit)
阻塞一直等待执行完成拿到结果,如果在超时时间内,没有拿到抛出异常
3.isCancelled()
是否被取消
4.isDone()
是否已经完成
5.cancel(boolean mayInterruptIfRunning)
试图取消正在执行的任务
二、FutureTask的状态转换过程
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
三、FutureTask的执行过程
创建一个futureTask对象task
提交task到调度器executor等待调度或者在另外一个线程中执行task 等待调度中... 如果此时currentThread调取执行结果task.get(),会有几种情况
if task 还没有被executor调度或正在执行中
阻塞当前线程,并加入到一个阻塞链表中waitNode
else if task被其它Thread取消,并取消成功 或task处于中断状态
throw exception
else if task执行完毕,返回执行结果,或执行存在异常,返回异常信息 如果此时有另外一个线程调用task.get() 执行过程同上
四、应用场景
1. Future用于异步获取执行结果或者取消任务。
2. 在高并发场景下确保任务只执行一次。
五、源码分析
1.核心状态
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
2.构造函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
} public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
3.获取执行结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
} public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
4.执行方法
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
5.设置状态
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
} protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
六、代码示例
1. FutureTask执行多任务计算的使用场景
利用FutureTask和ExecutorService,可以用多线程的方式提交计算任务,主线程继续执行其他任务,当主线程需要子线程的计算结果时,在异步获取子线程的执行结果。
public class FutureTaskForMultiCompute {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建任务集合
List<FutureTask<Integer>> taskList = new ArrayList<FutureTask<Integer>>();
// 创建线程池
ExecutorService exec = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
FutureTask<Integer> ft = new FutureTask<>(new ComputeTask(i, ""+i));
taskList.add(ft);
// 提交给线程池执行任务,也可以通过exec.invokeAll(taskList)一次性提交所有任务;
exec.submit(ft);
} System.out.println("所有计算任务提交完毕, 主线程接着干其他事情!"); // 开始统计各计算线程计算结果
Integer totalResult = 0;
for (FutureTask<Integer> ft : taskList) {
totalResult += ft.get();
} // 关闭线程池
exec.shutdown();
System.out.println("多任务计算后的总结果是:" + totalResult);
} }
class ComputeTask implements Callable<Integer> { private Integer result = 0;
private String taskName = ""; public String getTaskName(){
return this.taskName;
} public ComputeTask(Integer iniResult, String taskName){
result = iniResult;
this.taskName = taskName;
System.out.println("生成子线程计算任务: "+taskName);
} @Override
public Integer call() throws Exception {
for (int i = 0; i < 100; i++) {
result =+ i;
}
// 休眠5秒钟,观察主线程行为,预期的结果是主线程会继续执行,到要取得FutureTask的结果是等待直至完成。
Thread.sleep(5000);
System.out.println("子线程计算任务: "+taskName+" 执行完成!");
return result;
} }
2. FutureTask在高并发环境下确保任务只执行一次
在很多高并发的环境下,往往我们只需要某些任务只执行一次。这种使用情景FutureTask的特性恰能胜任。举一个例子,假设有一个带key的连接池,当key存在时,即直接返回key对应的对象;当key不存在时,则创建连接。对于这样的应用场景,通常采用的方法为使用一个Map对象来存储key和连接池对应的对应关系,典型的代码如下面所示:
public class FutureTaskTest {
private Map<String, Connection> connectionPool = new HashMap<String, Connection>();
private ReentrantLock lock = new ReentrantLock(); public Connection getConnection(String key){
try {
lock.lock();
if(connectionPool.containsKey(key)){
return connectionPool.get(key);
}else{
//创建 Connection
Connection conn = createConnection();
connectionPool.put(key, conn);
return conn;
}
} finally{
lock.unlock();
} } //创建Connection
private Connection createConnection(){
return null;
} }
在上面的例子中,我们通过加锁确保高并发环境下的线程安全,也确保了connection只创建一次,然而确牺牲了性能。改用ConcurrentHash的情况下,几乎可以避免加锁的操作,性能大大提高,但是在高并发的情况下有可能出现Connection被创建多次的现象。这时最需要解决的问题就是当key不存在时,创建Connection的动作能放在connectionPool之后执行,这正是FutureTask发挥作用的时机,基于ConcurrentHashMap和FutureTask的改造代码如下:
public class FutureTaskTest {
private ConcurrentHashMap<String,FutureTask<Connection>>connectionPool = new ConcurrentHashMap<String, FutureTask<Connection>>(); public Connection getConnection(String key) throws Exception{
FutureTask<Connection> connectionTask = connectionPool.get(key);
if(connectionTask!=null){
return connectionTask.get();
}
else{
Callable<Connection> callable = new Callable<Connection>(){
@Override
public Connection call() throws Exception {
// TODO Auto-generated method stub
return createConnection();
}
};
FutureTask<Connection> newTask = new FutureTask<Connection>(callable);
connectionTask = connectionPool.putIfAbsent(key, newTask);
if(connectionTask==null){
connectionTask = newTask;
connectionTask.run();
}
return connectionTask.get();
}
} //创建Connection
private Connection createConnection(){
return null;
} }
经过这样的改造,可以避免由于并发带来的多次创建连接及锁的出现。