线程池——JAVA并发编程指南

时间:2022-02-16 06:14:35

TPS00-J. 用线程池实现应用在流量暴涨时优雅降级

很多程序都要解决这样一个问题——处理一系列外来的请求。Thread- Per-Message这种设计模式是最简单的并发策略了,它为每一个请求创建一个线程。这种模式在耗时较长,受io约束,基于session或者,任务相互独立等场景下表现优于顺序处理。

但是,这种设计也有几个缺陷,包括线程创建和调用,任务处理,资源分配和释放,和频繁的上下文切换等带来的的开销。此外,攻击者可以通过一下子发起铺天盖地的请求造展开DoS攻击。系统不能优雅的降级,而是变得反应迟钝,从而导致拒绝服务。从安全的角度来看,由于一些偶现错误,一个组件可以用尽所有资源,饿死所有其他组件。

线程池允许系统在它承受力充裕的范围内处理尽可能多的请求。而不是一遇到过量的请求就挂掉。线程池通过限制初始化的工作线程数和同时运行的线程数来克服这些问题。每个支持线程池的对象都接受一个Runnable或Callable<T>的任务,并将其存储在临时队列中,直到有资源可用。因为在一个线程池的线程可以重复使用,并能快速的地加入线程池货从其中移出,管理线程的生命周期的开销被最小化。

不规范代码示例

这个示例演示了Thread-Per-Message设计模式,RequestHandler类提供了一个public static的工厂方法。调用者可以通过此方法获得Handler的实例,然后在各自的线程中调用handleRequest()处理请求。

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket; class Helper {
public void handle(Socket socket) {
// ... }
}
} final class RequestHandler {
private final Helper helper = new Helper();
private final ServerSocket server; private RequestHandler(int port) throws IOException {
server = new ServerSocket(port);
} public static RequestHandler newInstance() throws IOException {
return new RequestHandler(0); // 自动获得端口
} public void handleRequest() {
new Thread(new Runnable() {
public void run() {
try {
helper.handle(server.accept());
} catch (IOException e) {
// Forward to handler
}
}
}).start();
}
}

Thread-Per-Message设计模式无法让服务优雅的降级。只要稀缺资源没有耗尽,系统还是可以正常的提供服务。举例来说,虽然可以创建很多的线程,系统中打开的文件描述符是有限的,文件描述符就是稀缺资源。如果我们的稀缺资源是内存,系统会突然的停止服务。

TPS01-J. 不要在有界线程池中运行互相依赖的任务

有界线程池是指其同一时刻执行任务的线程总数有上限。需要等待其他任务完成的任务不应该放到游街线程池中执行

有一种死锁叫做线程饿死型死锁,当线程池中的所有线程都在等待一个尚未开始的任务(只是进入了线程池的内部队列),这种死锁就会发生了

这种问题很有欺骗性,因为当需要的线程较少的时候程序可以正常的运行。有时候调大线程数就可以缓解这个问题。但是,确定合适的数量通常很难

不规范代码示例(子任务有依赖关系)

下面的例子有线程饿死的风险,本例包含一个ValidationService类负责执行各种检查,比如到后台的数据库检查用户输入的字段是否存在。

fieldAggregator()接受字符串类型的可变长的参数,并且为每一个参数创建一个任务,以便执行。每个任务使用ValidateInput执行验证。

反过来,ValidateInput 类将尝试为每个请求创建一个子任务,在任务重使用 SanitizeInput 类净化输入。在同一个线程池执行所有任务。在所有任务都执行完毕前FieldAggregator() 方法一直被阻塞,当所有结果都都可用了,FieldAggregator把返回结果汇总成 StringBuilder 对象返回给给调用者。

public final class ValidationService {
private final ExecutorService pool; public ValidationService(int poolSize) {
pool = Executors.newFixedThreadPool(poolSize);
} public void shutdown() {
pool.shutdown();
} public StringBuilder fieldAggregator(String... inputs)
throws InterruptedException, ExecutionException {
StringBuilder sb = new StringBuilder();
Future<String>[] results = new Future[inputs.length]; // 保存结果 for (int i = 0; i < inputs.length; i++) { // 把任务提交到线程池 results[i] = pool
.submit(new ValidateInput<String>(inputs[i], pool));
}
for (int i = 0; i < inputs.length; i++) { // 把结果汇总
sb.append(results[i].get());
}
return sb;
}
} public final class ValidateInput<V> implements Callable<V> {
private final V input;
private final ExecutorService pool; ValidateInput(V input, ExecutorService pool) {
this.input = input;
this.pool = pool;
} @Override
public V call() throws Exception {
// 如果验证失败,在此处抛出异常
Future<V> future = pool.submit(new SanitizeInput<V>(input)); // Subtask
return (V) future.get();
}
} public final class SanitizeInput<V> implements Callable<V> {
private final V input; SanitizeInput(V input) {
this.input = input;
} @Override
public V call() throws Exception {
// Sanitize input and return
return (V) input;
}
}

假设池大小设置为6,调用ValidationService.fieldAggregator()方法来验证6个参数,它提交6项任务的线程池。每个任务提交相应的子任务来清理输入。只要SanitizeInput子任务执行了,验证的线程就可以返回它们结果了,这些线程可以返回他们的结果。然而,这是不可能的,因为在线程池中所有六个线程,它们都被阻塞了。此外,由于还有活跃任务, 调用shutdown()方法无法关闭线程池。

标准代码示例(互相不依赖的任务)

在标准的代码中修改了ValidateInput<V>,SanitizeInput任务和它在同一个线程中执行。这样ValidateInput和SanitizeInput就独立开来,不再需要等待另一个执行结束。对SanitizeInput也做了修改,不再实现Callable接口

public final class ValidationService {
// ...
public StringBuilder fieldAggregator(String... inputs)
throws InterruptedException, ExecutionException {
// ...
for (int i = 0; i < inputs.length; i++) {
// 不把线程池传进去
results[i] = pool.submit(new ValidateInput<String>(inputs[i]));
}
// ...
}
} //不再使用同一个线程池
public final class ValidateInput<V> implements Callable<V> {
private final V input; ValidateInput(V input) {
this.input = input;
} @Override
public V call() throws Exception {
// 如果验证失败,抛出异常
return (V) new SanitizeInput().sanitize(input);
}
} public final class SanitizeInput<V> { // 不实现Callable接口
public SanitizeInput() {
} public V sanitize(V input) {
// 净化输入并返回
return input;
}
}

不规范代码(子任务)

本示例包含了一些列的子任务,运行在一个共享的线程池中。BrowserManager类调用perUser(), perUser()创建子任务来调用perProfile(), perProfile()又创建子任务来调用perTab(),最终perTab()又创建子任务调用doSomething()。BrowserManager等待所有的子任务结束。

public final class BrowserManager {
private final ExecutorService pool = Executors.newFixedThreadPool(10);
private final int numberOfTimes;
private static AtomicInteger count = new AtomicInteger(); // count = 0 public BrowserManager(int n) {
numberOfTimes = n;
} public void perUser() {
methodInvoker(numberOfTimes, "perProfile");
pool.shutdown();
} public void perProfile() {
methodInvoker(numberOfTimes, "perTab");
} public void perTab() {
methodInvoker(numberOfTimes, "doSomething");
} public void doSomething() {
System.out.println(count.getAndIncrement());
} public void methodInvoker(int n, final String method) {
final BrowserManager manager = this;
Callable<Object> callable = new Callable<Object>() {
public Object call() throws Exception {
Method meth = manager.getClass().getMethod(method);
return meth.invoke(manager);
}
};
Collection<Callable<Object>> collection = Collections.nCopies(n, callable);
try {
Collection<Future<Object>> futures = pool.invokeAll(collection);
} catch (InterruptedException e) {
// Forward to handler
Thread.currentThread().interrupt(); // Reset interrupted status
}
// ...
} public static void main(String[] args) {
BrowserManager manager = new BrowserManager(5);
manager.perUser();
}
}

不幸的是,这个方案很容易出现线程饿死型死锁。例如,5个perUser()任务,每个生5个perProfile()任务,每个perProfile()任务生成5个perTab()任务,线程池会被耗尽,没有剩余的线程给而perTab来调用doSomething()方法。

java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<Object>>) 会等待所有任务结束。

标准代码示例(CallerRunsPolicy)

在这个标准的解决方案中,对任务进行了选择和调度,,避免了线程饥饿死锁。它设置了ThreadPoolExecutor的CallerRunsPolicy,并且使用了SynchronousQueue。这个策略要求,如果线程池的线程耗尽,所有后继的任务都在其发起的线程中执行

public final class BrowserManager {
private final static ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
private final int numberOfTimes;
private static AtomicInteger count = new AtomicInteger(); // count = 0
static {
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
// ...
}

TPS02-J. 确保提交给一个线程池的任务是可中断的

TPS03-J. 确保线程池中的任务不会默默的失败

TPS04-J. 使用线程池时要确保ThreadLocal被重置