1.Future和Callable
Future是一个接口表示异步计算的结果,它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。Future提供了get()、cancel()、isCancel()、isDone()四种方法,表示Future有三种功能:
1、判断任务是否完成
2、中断任务
3、获取任务执行结果
Callable和Runnable差不多,两者都是为那些其实例可能被另一个线程执行的类而设计的,最主要的差别在于Runnable不会返回线程运算结果,Callable可以(假如线程需要返回运行结果)
public class CallableAndFuture {
public static class CallableThread implements Callable<String> { @Override
public String call() throws Exception {
Thread.sleep(3000);
System.out.println("方法A过了3秒钟才返回数据");
return "A返回结果";
} } public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
CallableThread cThread = new CallableThread();
Future<String> submit = newCachedThreadPool.submit(cThread);
System.out.println(submit.get());
} }
输出:
方法A过了3秒才返回结果
2.FutureTask
先上个FutureTask的类图
FutureTask实现了Runnable和Future,实际上是这两个接口的包装器,所以FutureTask既是Runnable也是Future
我们先写个基本的例子看看FutureTask的使用
Callable<Integer> myComputation = ...;
FutureTask<Integer> task = new FutureTask<Integer>(myComputation);
Thread t = new Thread(task);
t.start();
...
Integer result = task.get(); //获取结果
再看下他的构造方法
相比第一个构造方法,第二个构造方法里面把Runnable转成了callable,所以两个构造方法实现的功能其实都差不多。
FutureTask里面最重要的方法就是get方法了,该方法实际上是对Future的get的实现,下面我们研究下FutureTask的代码
1.FutureTask的状态转换过程:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
2.具体的get方法如下
实现阻塞效果的是awaitDone,具体如下
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//先定义一堆变量
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//注意,这个是死循环,阻塞有他一半的功劳
for (;;) {
//刚开始线程还没加入到阻塞队列中这段代码是没有用的,
//这里的作用是当线程已加入队列后,这时候线程被中断了,那就把线程从队列中移除
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
} int s = state;
//任务结束,返回状态
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//将要结束,那就再等等呗
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//初始化
else if (q == null)
q = new WaitNode();
//还没加入队列,那就用CAS加进去呗
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//将线程挂起来,这里就阻塞了
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
上述讲述的是FutureTask的阻塞实现,其实还有个疑惑,当线程运行完毕,阻塞会自动解除获取结果,这究竟是怎么实现的呢
我们看看线程的run方法
这里有个ran变量,当获取到执行结果后ran变量为true,再执行set方法
这个可以看出正常情况下FutureTask的状态变化是
NEW -> COMPLETING -> NORMAL
我们再看出 finishCompletion
哈,找到了,类似于AQS的共享锁,这里也做了持续的唤醒