第四章 Thread Executors(Executors多线程架构)【中】

时间:2022-09-14 18:35:25
本章涉及内容:
  • 创建一个executor线程
  • 创建一个固定大小executor
  • 执行executor任务返回一个结果
  • 处理多任务和处理第一个结果
  • 处理多任务和处理所有结果
  • 延迟之后运行executor
  • 周期地运行任务的executor
  • 在executor取消任务
  • 在executor控制任务完成
  • 在executor分开task启动和处理结果
  • 控制executor的拒绝的任务

1、处理多任务和处理所有结果

实现等待其他任务完成之后再进行

isDone() :判断线程是否执行完毕

awaitTermination() : 等待所有任务完成之后调用shutdown()方法

例子: 三个任务执行完之后打印结果

package com.jack;

public class Result {

private String name;
private int value;

public String getName() {
return name;
}
public int getValue() {
return value;
}
public void setName(String name) {
this.name = name;
}
public void setValue(int value) {
this.value = value;
}


}
package com.jack;import java.util.Date;import java.util.concurrent.Callable;import java.util.concurrent.TimeUnit;public class Task implements Callable<Result> {private String name;public Task(String name) {super();this.name = name;}@Overridepublic Result call() {System.out.printf("%s : 字符串\n", this.name);long duration = (long) (Math.random()*10);System.out.printf("%s:等待 %d 秒结果\n", this.name,duration);try {TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {e.printStackTrace();}int value =0;for (int i=0; i<5; i++){value += (int)(Math.random()*100);}Result result = new Result();result.setName(this.name);result.setValue(value);System.out.printf(this.name + ": 结果\n");return result;}}

package com.jack;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();

List<Task> taskList = new ArrayList<>();
for(int i=0; i<3; i++){
Task task = new Task("Task"+i);
taskList.add(task);
}

List<Future<Result>> resultList =null;
try{
//等待所有任务执行完毕才往后走
resultList = executor.invokeAll(taskList);

}catch (InterruptedException e){
e.printStackTrace();
}
executor.shutdown();
System.out.printf("Main : 打印结果\n");
for(int i=0; i<resultList.size(); i++){
Future<Result> future = resultList.get(i);
try {
Result result = future.get();
System.out.printf(result.getName() + ": " + result.getValue()+"\n");

} catch (InterruptedException | ExecutionException e){
e.printStackTrace();
}
}
}
}
日志:

Task0 : 字符串
Task2 : 字符串
Task1 : 字符串
Task0:等待 9 秒结果
Task1:等待 0 秒结果
Task2:等待 9 秒结果
Task1: 结果
Task0: 结果
Task2: 结果
Main : 打印结果
Task0: 273
Task1: 276
Task2: 132


总结:

  • 1、创建一个ExecutorService
  • 2、创建任务List集合
  • 3、创建接受结果List<Future<Result>> 集合
  • 4、executor.invokeAll(taskList) 执行所有任务完成之后才继续后面的代码
  • 5、最后打印结果

2、executor的延迟操作

ScheduledThreadPoolExecutor 周期执行任务

package com.jack;

import java.util.Date;
import java.util.concurrent.Callable;

public class Task implements Callable<String> {

private String name;

public Task(String name) {
super();
this.name = name;
}

@Override
public String call() throws Exception {
System.out.printf("%s: 开始于: %s\n", name, new Date());
return "Hello, world";
}

}
package com.jack;import java.util.Date;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class Main {public static void main(String[] args) {ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);System.out.printf("Main : 开始于 : %s\n", new Date());for (int i=0; i<5; i++){Task task = new Task ("Task " + i);executor.schedule(task, i+2, TimeUnit.SECONDS);}executor.shutdown();try {executor.awaitTermination(1, TimeUnit.DAYS);} catch (InterruptedException e){e.printStackTrace();}System.out.printf("Main : 结束于:%s\n", new Date());}}

日志:

Main : 开始于 : Wed Aug 16 11:28:19 CST 2017
Task 0: 开始于: Wed Aug 16 11:28:21 CST 2017
Task 1: 开始于: Wed Aug 16 11:28:22 CST 2017
Task 2: 开始于: Wed Aug 16 11:28:23 CST 2017
Task 3: 开始于: Wed Aug 16 11:28:24 CST 2017
Task 4: 开始于: Wed Aug 16 11:28:25 CST 2017
Main : 结束于:Wed Aug 16 11:28:25 CST 2017
总结:

  • 1、创建ScheduledThreadPoolExecutor对象
  • 2、执行executor.schedule(task, i+2, TimeUnit.SECONDS); (第一个参数是任务(不一定是Callable接口,Runnable接口亦可以),第二参数,数值(延迟多少),第三参数单位,(上面显示的秒,也可以是天,小时等等))
  • 3、executor.shutdown();关闭executor

  • 4、executor.awaitTermination(1, TimeUnit.DAYS); 防止Main线程提前退出
  • 5、executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);会在没有执行完就直接关闭了executor

3、周期性运行任务

package com.jack;

import java.util.Date;

public class Task implements Runnable {

private String name;

public Task(String name) {
super();
this.name = name;
}

@Override
public void run() {
System.out.printf("%s: 开始于: %s\n", name, new Date());
}

}
总结:实现Runnable接口而不是Callable接口

package com.jack;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
public static void main(String[] args) {
ScheduledThreadPoolExecutor executor =
(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1);

System.out.printf("Main : 开始于 : %s\n", new Date());
Task task = new Task ("Task ");
ScheduledFuture<?> result=executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
for (int i=0; i<20; i++){
System.out.printf("Main: 延迟: %d\n", result.getDelay(TimeUnit.MILLISECONDS));
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
executor.shutdown();

try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.printf("Main : 结束于:%s\n", new Date());
}
}

日志:

第四章 Thread Executors(Executors多线程架构)【中】

总结:

  • 1、ScheduledFuture<?> result=executor.scheduleAtFixedRate(task,1, 2, TimeUnit.SECONDS);
  • 第一个参数:实现Runnable的实例
  • 第二个参数:表示初始化延迟
  • 第三个参数:表示延迟的间隔时间
  • 第四个参数:单位(上面实例代表是秒)
  • 2、result.getDelay(TimeUnit.MILLISECONDS) 获取对象的延迟时间值
  • 3、如果你取消executor.shutdown(); 会一直按照周期执行下去

4、在executor取消任务

使用Future的cancel() 方法实现

package com.jack;

import java.util.concurrent.Callable;

public class Task implements Callable<String> {


@Override
public String call() throws Exception {
while (true){
System.out.printf("Task: Test\n");
Thread.sleep(100);
}
}


}

package com.jack;

import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main {
public static void main(String[] args) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
Task task = new Task();
System.out.printf("Main : 执行任务\n");
Future<String> result = executor.submit(task);

try{
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e){
e.printStackTrace();
}
System.out.printf("Main : 取消这个任务");
result.cancel(true);
System.out.printf("Main : 取消了吗?: %s\n", result.isCancelled());
System.out.printf("Main 完成了:%s\n", result.isDone());
executor.shutdown();
System.out.printf("Main : 执行已经完成了\n");
}
}

日志:

第四章 Thread Executors(Executors多线程架构)【中】

总结:

result.cancel(true);