Java并发编程实践:Callable异步回调Future、FutureTask用法

时间:2022-03-31 18:02:43

Callable接口类似于Runnable,从名字就可以看出来了,但是Runnable不会返回结果,并且无法抛出返回结果的异常,而Callable功能更强大一些,被线程执行后,可以返回值,这个返回值可以被Future拿到。FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future得到。

Future特性

需要明确一点:java.util.concurrent.Future 是一个异步回调接口

Future接口源码

/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
*
*/

/*
*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*/

package java.util.concurrent;

/**
* A <tt>Future</tt> represents the result of an asynchronous
* computation. Methods are provided to check if the computation is
* complete, to wait for its completion, and to retrieve the result of
* the computation. The result can only be retrieved using method
* <tt>get</tt> when the computation has completed, blocking if
* necessary until it is ready. Cancellation is performed by the
* <tt>cancel</tt> method. Additional methods are provided to
* determine if the task completed normally or was cancelled. Once a
* computation has completed, the computation cannot be cancelled.
* If you would like to use a <tt>Future</tt> for the sake
* of cancellability but not provide a usable result, you can
* declare types of the form {@code Future<?>} and
* return <tt>null</tt> as a result of the underlying task.
*
* <p>
* <b>Sample Usage</b> (Note that the following classes are all
* made-up.) <p>
* <pre> {@code
* interface ArchiveSearcher { String search(String target); }
* class App {
* ExecutorService executor = ...
* ArchiveSearcher searcher = ...
* void showSearch(final String target)
* throws InterruptedException {
* Future<String> future
* = executor.submit(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* displayOtherThings(); // do other things while searching
* try {
* displayText(future.get()); // use future
* } catch (ExecutionException ex) { cleanup(); return; }
* }
* }}</pre>
*
* The {@link FutureTask} class is an implementation of <tt>Future</tt> that
* implements <tt>Runnable</tt>, and so may be executed by an <tt>Executor</tt>.
* For example, the above construction with <tt>submit</tt> could be replaced by:
* <pre> {@code
* FutureTask<String> future =
* new FutureTask<String>(new Callable<String>() {
* public String call() {
* return searcher.search(target);
* }});
* executor.execute(future);}</pre>
*
* <p>Memory consistency effects: Actions taken by the asynchronous computation
* <a href="package-summary.html#MemoryVisibility"> <i>happen-before</i></a>
* actions following the corresponding {@code Future.get()} in another thread.
*
* @see FutureTask
* @see Executor
* @since 1.5
* @author Doug Lea
* @param <V> The result type returned by this Future's <tt>get</tt> method
*/
public interface Future<V> {

/**
* Attempts to cancel execution of this task. This attempt will
* fail if the task has already completed, has already been cancelled,
* or could not be cancelled for some other reason. If successful,
* and this task has not started when <tt>cancel</tt> is called,
* this task should never run. If the task has already started,
* then the <tt>mayInterruptIfRunning</tt> parameter determines
* whether the thread executing this task should be interrupted in
* an attempt to stop the task.
*
* <p>After this method returns, subsequent calls to {@link #isDone} will
* always return <tt>true</tt>. Subsequent calls to {@link #isCancelled}
* will always return <tt>true</tt> if this method returned <tt>true</tt>.
*
* @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @return <tt>false</tt> if the task could not be cancelled,
* typically because it has already completed normally;
* <tt>true</tt> otherwise
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* Returns <tt>true</tt> if this task was cancelled before it completed
* normally.
*
* @return <tt>true</tt> if this task was cancelled before it completed
*/
boolean isCancelled();

/**
* Returns <tt>true</tt> if this task completed.
*
* Completion may be due to normal termination, an exception, or
* cancellation -- in all of these cases, this method will return
* <tt>true</tt>.
*
* @return <tt>true</tt> if this task completed
*/
boolean isDone();

/**
* Waits if necessary for the computation to complete, and then
* retrieves its result.
*
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
*/
V get() throws InterruptedException, ExecutionException;

/**
* Waits if necessary for at most the given time for the computation
* to complete, and then retrieves its result, if available.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @return the computed result
* @throws CancellationException if the computation was cancelled
* @throws ExecutionException if the computation threw an
* exception
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the wait timed out
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Callable返回Future示例

import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.TimeUnit;/** * Callable的Future用法 * @package .CallableDemo * @date   2017年4月5日  下午2:53:18 * @author pengjunlin * @comment    * @update */public class CallableFuture {/** * @param args * @throws Exception * @throws InterruptedException */public static void main(String[] args) throws InterruptedException,Exception {// TODO Auto-generated method stubExecutorService exec = Executors.newCachedThreadPool();// Future是一个接口,该接口用来返回异步的结果。Future<String> st = exec.submit(new TaskCallable());/* 同步结果,并且设置超时时间 */System.out.println(st.get(10000, TimeUnit.MILLISECONDS));System.out.println("finished");}}class TaskCallable implements Callable<String> {public String call() throws Exception {// TODO Auto-generated method stubThread.sleep(1000);return "callstatus=OK";}}


FutureTask

FutureTask实现了java.util.concurrent.RunnableFuture<V>接口,实际上实现了Runnable和 Future<V>两个接口。

FutureTask源码

/* * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms. * * *//* * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */package java.util.concurrent;import java.util.concurrent.locks.LockSupport;/** * A cancellable asynchronous computation.  This class provides a base * implementation of {@link Future}, with methods to start and cancel * a computation, query to see if the computation is complete, and * retrieve the result of the computation.  The result can only be * retrieved when the computation has completed; the {@code get} * methods will block if the computation has not yet completed.  Once * the computation has completed, the computation cannot be restarted * or cancelled (unless the computation is invoked using * {@link #runAndReset}). * * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or * {@link Runnable} object.  Because {@code FutureTask} implements * {@code Runnable}, a {@code FutureTask} can be submitted to an * {@link Executor} for execution. * * <p>In addition to serving as a standalone class, this class provides * {@code protected} functionality that may be useful when creating * customized task classes. * * @since 1.5 * @author Doug Lea * @param <V> The result type returned by this FutureTask's {@code get} methods */public class FutureTask<V> implements RunnableFuture<V> {    /*     * Revision notes: This differs from previous versions of this     * class that relied on AbstractQueuedSynchronizer, mainly to     * avoid surprising users about retaining interrupt status during     * cancellation races. Sync control in the current design relies     * on a "state" field updated via CAS to track completion, along     * with a simple Treiber stack to hold waiting threads.     *     * Style note: As usual, we bypass overhead of using     * AtomicXFieldUpdaters and instead directly use Unsafe intrinsics.     */    /**     * The run state of this task, initially NEW.  The run state     * transitions to a terminal state only in methods set,     * setException, and cancel.  During completion, state may take on     * transient values of COMPLETING (while outcome is being set) or     * INTERRUPTING (only while interrupting the runner to satisfy a     * cancel(true)). Transitions from these intermediate to final     * states use cheaper ordered/lazy writes because values are unique     * and cannot be further modified.     *     * Possible state transitions:     * NEW -> COMPLETING -> NORMAL     * NEW -> COMPLETING -> EXCEPTIONAL     * NEW -> CANCELLED     * NEW -> INTERRUPTING -> INTERRUPTED     */    private volatile int state;    private static final int NEW          = 0;    private static final int COMPLETING   = 1;    private static final int NORMAL       = 2;    private static final int EXCEPTIONAL  = 3;    private static final int CANCELLED    = 4;    private static final int INTERRUPTING = 5;    private static final int INTERRUPTED  = 6;    /** The underlying callable; nulled out after running */    private Callable<V> callable;    /** The result to return or exception to throw from get() */    private Object outcome; // non-volatile, protected by state reads/writes    /** The thread running the callable; CASed during run() */    private volatile Thread runner;    /** Treiber stack of waiting threads */    private volatile WaitNode waiters;    /**     * Returns result or throws exception for completed task.     *     * @param s completed state value     */    @SuppressWarnings("unchecked")    private V report(int s) throws ExecutionException {        Object x = outcome;        if (s == NORMAL)            return (V)x;        if (s >= CANCELLED)            throw new CancellationException();        throw new ExecutionException((Throwable)x);    }    /**     * Creates a {@code FutureTask} that will, upon running, execute the     * given {@code Callable}.     *     * @param  callable the callable task     * @throws NullPointerException if the callable is null     */    public FutureTask(Callable<V> callable) {        if (callable == null)            throw new NullPointerException();        this.callable = callable;        this.state = NEW;       // ensure visibility of callable    }    /**     * Creates a {@code FutureTask} that will, upon running, execute the     * given {@code Runnable}, and arrange that {@code get} will return the     * given result on successful completion.     *     * @param runnable the runnable task     * @param result the result to return on successful completion. If     * you don't need a particular result, consider using     * constructions of the form:     * {@code Future<?> f = new FutureTask<Void>(runnable, null)}     * @throws NullPointerException if the runnable is null     */    public FutureTask(Runnable runnable, V result) {        this.callable = Executors.callable(runnable, result);        this.state = NEW;       // ensure visibility of callable    }    public boolean isCancelled() {        return state >= CANCELLED;    }    public boolean isDone() {        return state != NEW;    }    public boolean cancel(boolean mayInterruptIfRunning) {        if (state != NEW)            return false;        if (mayInterruptIfRunning) {            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))                return false;            Thread t = runner;            if (t != null)                t.interrupt();            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state        }        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))            return false;        finishCompletion();        return true;    }    /**     * @throws CancellationException {@inheritDoc}     */    public V get() throws InterruptedException, ExecutionException {        int s = state;        if (s <= COMPLETING)            s = awaitDone(false, 0L);        return report(s);    }    /**     * @throws CancellationException {@inheritDoc}     */    public V get(long timeout, TimeUnit unit)        throws InterruptedException, ExecutionException, TimeoutException {        if (unit == null)            throw new NullPointerException();        int s = state;        if (s <= COMPLETING &&            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)            throw new TimeoutException();        return report(s);    }    /**     * Protected method invoked when this task transitions to state     * {@code isDone} (whether normally or via cancellation). The     * default implementation does nothing.  Subclasses may override     * this method to invoke completion callbacks or perform     * bookkeeping. Note that you can query status inside the     * implementation of this method to determine whether this task     * has been cancelled.     */    protected void done() { }    /**     * Sets the result of this future to the given value unless     * this future has already been set or has been cancelled.     *     * <p>This method is invoked internally by the {@link #run} method     * upon successful completion of the computation.     *     * @param v the value     */    protected void set(V v) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = v;            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state            finishCompletion();        }    }    /**     * Causes this future to report an {@link ExecutionException}     * with the given throwable as its cause, unless this future has     * already been set or has been cancelled.     *     * <p>This method is invoked internally by the {@link #run} method     * upon failure of the computation.     *     * @param t the cause of failure     */    protected void setException(Throwable t) {        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {            outcome = t;            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state            finishCompletion();        }    }    public void run() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return;        try {            Callable<V> c = callable;            if (c != null && state == NEW) {                V result;                boolean ran;                try {                    result = c.call();                    ran = true;                } catch (Throwable ex) {                    result = null;                    ran = false;                    setException(ex);                }                if (ran)                    set(result);            }        } finally {            // runner must be non-null until state is settled to            // prevent concurrent calls to run()            runner = null;            // state must be re-read after nulling runner to prevent            // leaked interrupts            int s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }    }    /**     * Executes the computation without setting its result, and then     * resets this future to initial state, failing to do so if the     * computation encounters an exception or is cancelled.  This is     * designed for use with tasks that intrinsically execute more     * than once.     *     * @return true if successfully run and reset     */    protected boolean runAndReset() {        if (state != NEW ||            !UNSAFE.compareAndSwapObject(this, runnerOffset,                                         null, Thread.currentThread()))            return false;        boolean ran = false;        int s = state;        try {            Callable<V> c = callable;            if (c != null && s == NEW) {                try {                    c.call(); // don't set result                    ran = true;                } catch (Throwable ex) {                    setException(ex);                }            }        } finally {            // runner must be non-null until state is settled to            // prevent concurrent calls to run()            runner = null;            // state must be re-read after nulling runner to prevent            // leaked interrupts            s = state;            if (s >= INTERRUPTING)                handlePossibleCancellationInterrupt(s);        }        return ran && s == NEW;    }    /**     * Ensures that any interrupt from a possible cancel(true) is only     * delivered to a task while in run or runAndReset.     */    private void handlePossibleCancellationInterrupt(int s) {        // It is possible for our interrupter to stall before getting a        // chance to interrupt us.  Let's spin-wait patiently.        if (s == INTERRUPTING)            while (state == INTERRUPTING)                Thread.yield(); // wait out pending interrupt        // assert state == INTERRUPTED;        // We want to clear any interrupt we may have received from        // cancel(true).  However, it is permissible to use interrupts        // as an independent mechanism for a task to communicate with        // its caller, and there is no way to clear only the        // cancellation interrupt.        //        // Thread.interrupted();    }    /**     * Simple linked list nodes to record waiting threads in a Treiber     * stack.  See other classes such as Phaser and SynchronousQueue     * for more detailed explanation.     */    static final class WaitNode {        volatile Thread thread;        volatile WaitNode next;        WaitNode() { thread = Thread.currentThread(); }    }    /**     * Removes and signals all waiting threads, invokes done(), and     * nulls out callable.     */    private void finishCompletion() {        // assert state > COMPLETING;        for (WaitNode q; (q = waiters) != null;) {            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {                for (;;) {                    Thread t = q.thread;                    if (t != null) {                        q.thread = null;                        LockSupport.unpark(t);                    }                    WaitNode next = q.next;                    if (next == null)                        break;                    q.next = null; // unlink to help gc                    q = next;                }                break;            }        }        done();        callable = null;        // to reduce footprint    }    /**     * Awaits completion or aborts on interrupt or timeout.     *     * @param timed true if use timed waits     * @param nanos time to wait, if timed     * @return state upon completion     */    private int awaitDone(boolean timed, long nanos)        throws InterruptedException {        final long deadline = timed ? System.nanoTime() + nanos : 0L;        WaitNode q = null;        boolean queued = false;        for (;;) {            if (Thread.interrupted()) {                removeWaiter(q);                throw new InterruptedException();            }            int s = state;            if (s > COMPLETING) {                if (q != null)                    q.thread = null;                return s;            }            else if (s == COMPLETING) // cannot time out yet                Thread.yield();            else if (q == null)                q = new WaitNode();            else if (!queued)                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,                                                     q.next = waiters, q);            else if (timed) {                nanos = deadline - System.nanoTime();                if (nanos <= 0L) {                    removeWaiter(q);                    return state;                }                LockSupport.parkNanos(this, nanos);            }            else                LockSupport.park(this);        }    }    /**     * Tries to unlink a timed-out or interrupted wait node to avoid     * accumulating garbage.  Internal nodes are simply unspliced     * without CAS since it is harmless if they are traversed anyway     * by releasers.  To avoid effects of unsplicing from already     * removed nodes, the list is retraversed in case of an apparent     * race.  This is slow when there are a lot of nodes, but we don't     * expect lists to be long enough to outweigh higher-overhead     * schemes.     */    private void removeWaiter(WaitNode node) {        if (node != null) {            node.thread = null;            retry:            for (;;) {          // restart on removeWaiter race                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {                    s = q.next;                    if (q.thread != null)                        pred = q;                    else if (pred != null) {                        pred.next = s;                        if (pred.thread == null) // check for race                            continue retry;                    }                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,                                                          q, s))                        continue retry;                }                break;            }        }    }    // Unsafe mechanics    private static final sun.misc.Unsafe UNSAFE;    private static final long stateOffset;    private static final long runnerOffset;    private static final long waitersOffset;    static {        try {            UNSAFE = sun.misc.Unsafe.getUnsafe();            Class<?> k = FutureTask.class;            stateOffset = UNSAFE.objectFieldOffset                (k.getDeclaredField("state"));            runnerOffset = UNSAFE.objectFieldOffset                (k.getDeclaredField("runner"));            waitersOffset = UNSAFE.objectFieldOffset                (k.getDeclaredField("waiters"));        } catch (Exception e) {            throw new Error(e);        }    }}

Callable返回FutureTask示例

import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;/** * Callable的MyFutureTask用法 *  * @package .MyFutureTask * @date   2017年4月5日  下午2:56:50 * @author pengjunlin * @comment    * @update */public class MyFutureTask {public static void main(String[] args) {Callable<int[]> primeCallable = new PrimeCallable(1000);FutureTask<int[]> primeTask = new FutureTask<int[]>(primeCallable);Thread t = new Thread(primeTask);t.start();try {// 假设现在做其他事情 Thread.sleep(5000);// 回来看看质数找好了吗if (primeTask.isDone()) {int[] primes = primeTask.get();for (int prime : primes) {System.out.print(prime + " ");}System.out.println();}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}class PrimeCallable implements Callable<int[]> {private int max;public PrimeCallable(int max) {this.max = max;}public int[] call() throws Exception {int[] prime = new int[max + 1];List<Integer> list = new ArrayList<Integer>();for (int i = 2; i <= max; i++)prime[i] = 1;for (int i = 2; i * i <= max; i++) { // 这里可以改进if (prime[i] == 1) {for (int j = 2 * i; j <= max; j++) {if (j % i == 0)prime[j] = 0;}}}for (int i = 2; i < max; i++) {if (prime[i] == 1) {list.add(i);}}int[] p = new int[list.size()];for (int i = 0; i < p.length; i++) {p[i] = list.get(i).intValue();}return p;}}