多线程-Callable、Future、FutureTask

时间:2023-03-09 09:01:08
多线程-Callable、Future、FutureTask

我们普遍知道的创建线程的方式有两种,一种是继承Thread,一种是实现Runnable接口。这两种方式都无法获取任务执行完后的结果,并发包提供了Callable 类能够得到任务执行完的结果。

为何需要Future与Callable的模式?我们先用常用方式来实现需求。获取线程执行完后的结果。

public class Demo1 {
public static void main(String[] args) {
Callable callable = new Callable() {
@Override
public void call(int num) {
System.out.println("线程运行结果:"+num);
}
};
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("线程:"+Thread.currentThread().getName()+"开始");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
callable.call(100);
System.out.println("线程:"+Thread.currentThread().getName()+"结束");
}
},"t1").start();
System.out.println("主线程结束");
}
}
interface Callable{
void call(int num);
}

 一般我们使用回调的方式来获取结果。但这种方式有三个缺点:

  1、必须要有回调接口。并根据线程运行情况,接口至少要拥有成功回调和错误回调两个方法。

  2、当线程运行后,只能等到线程回调接口,开发者没办法进行取消操作。

  3、如果要重复获取同样线程运行结果,只能重新运行线程。或缓存一个变量结果值。

  因此java提供了Future与Callable的模式


Callable:

  位于java.util.concurrent包下的一个接口,声明了一个call()方法。

public interface Callable<V> {
// 计算结果,如果无法计算结果,则抛出一个异常
V call() throws Exception;
}

   Callable一般与ExecutorService配合使用。在ExecutorService接口中声明了几个submit方法的重载:

//提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future
<T> Future<T> submit(Callable<T> task);
//提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象
<T> Future<T> submit(Runnable task, T result);
//提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future
Future<?> submit(Runnable task);

  对比Runnable:

Callable与Runnable的区别:
 
(1)Callable规定的方法是call(),而Runnable规定的方法是run()。 (2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。 (3)call()方法可抛出异常,而run()方法是不能抛出异常的。 (4)运行Callable任务可拿到一个Future对象。 (5) 加入线程池运行时,Runnable使用的是ExecutorService的execute方法,而Callable使用的submit方法。 

Future:

  Future可以对具体的Runnable或Callable任务的执行结果进行取消,查询是否完成、获取结果。可通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

  Future是位于java.util.concurrent包下的一个接口:

public interface Future<V> {
// 试图取消对此任务的执行,参数表示是否允许取消正在执行却未执行完的任务
boolean cancel(boolean mayInterruptIfRunning);
// 如果在任务正常完成前将其取消,则返回true
boolean isCancelled();
// 如果任务已完成,则返回true
boolean isDone();
// 获取执行结果,产生阻塞直到任务执行完毕才返回
V get() throws InterruptedException, ExecutionException;
// 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

  Futre提供的三种功能:判断任务是否完成、能够中断任务、能够获取 任务结果。Future是接口,无法直接创建对象来使用,因此就有了FutureTask。


FutureTask:

  FutureTask实现了RunnableFuture接口,而RunnableFuture继承了Runnable接口和Future接口。所以它可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。它是Future接口的一个唯一实现类。

  FutureTask的状态:

    1、未启动:当FutureTask.run()方法未被执行前,处于未启动状态。当创建一个FutureTask但run方法未执行前,也处于未启动状态。

    2、已启动:FutureTask.run()被执行的过程中,处于已启动状态。

    3、已完成:FutureTask.run()执行完正常结束或被取消或抛出异常而结束,都处于完成状态。

  FutureTask的执行:

  多线程-Callable、Future、FutureTask

    1、未启动或已启动状态下,执行FutureTask.get()方法会导致调用线程阻塞。已完成状态下,执行该方法调用线程会立即返回结果或抛出异常。

    2、未启动状态下,执行FutureTask.cancel()方法将导致任务永不执行。已启动状态下,执行cancel(true)方法将以中断执行此任务线程的方式来试图停止任务。取消成功返回true。但如果执行cancel(false)方法将不会对正在执行的任务线程产生影响(让线程正常执行到完成),此时cancel(...)返回false。当任务已经完成,执行cancel(...)方法将返回false。


使用实例:

Callable实现类

/**
* @Title: CallableServiceImpl
* @Description: Callable实现类
* @date 2019/1/1814:18
*/
public class CallableServiceImpl implements Callable<Integer> {
private static final Logger logger = LoggerFactory.getLogger(CallableServiceImpl.class);
private int sum;
@Override
public Integer call() throws Exception {
logger.info("callable子线程开始计算");
Thread.sleep(2000);
for (int i = 0; i < 100; i++) {
sum = sum + i;
}
logger.info("callable子线程结束计算");
return sum;
}
}

Callable+Future

/**
* @Title: FutureClient
* @Description:
* @date 2019/1/1814:21
*/
public class FutureClient {
private static final Logger logger = LoggerFactory.getLogger(FutureClient.class); public static void main(String[] args) {
//创建线程池
ExecutorService es = Executors.newSingleThreadExecutor();
//创建任务
CallableServiceImpl task = new CallableServiceImpl();
//提交任务并获取执行结果
Future<Integer> future = es.submit(task);
//关闭线程池
es.shutdown();
try {
Thread.sleep(2000);
if (future.get()!=null){
logger.info("结果是:"+future.get());
}else {
logger.info("未获取到结果");
}
} catch (Exception e) {
logger.error("FutureClient error {}",e);
}
logger.info("主线程执行完成");
}
}

Callable+FutureTask

/**
* @Title: FutureTaskClient
* @Description:
* @date 2019/1/1814:26
*/
public class FutureTaskClient {
private static final Logger logger = LoggerFactory.getLogger(FutureTaskClient.class); public static void main(String[] args) {
//创建线程池
ExecutorService es = Executors.newSingleThreadExecutor();
//创建任务
CallableServiceImpl task = new CallableServiceImpl();
//提交任务
FutureTask<Integer> futureTask = new FutureTask<>(task);
es.submit(futureTask);
//关闭线程池
es.shutdown();
try {
Thread.sleep(2000);
if (futureTask.get()!=null){
logger.info("结果是:"+futureTask.get());
}else {
logger.info("未获取到结果");
}
} catch (Exception e) {
logger.error("FutureTaskClient error {}",e);
}
logger.info("主线程执行完成");
}
}