多线程并发: 带任务描述和执行时间的线程池执行简易框架

时间:2022-05-16 18:11:41


      定位

      本文阐述一个带任务描述和执行时间的线程池执行简易框架, 可以方便地编写多任务程序, 并观察每个线程所代表的任务类型、内容及其执行时间, 以便为线程池调优收集重要参考数据。适合于想要快速在应用工程中添加多线程基础设施来执行并发任务的筒鞋。

 

      示例工程在 http://download.csdn.net/detail/shuqin1984/6368559 下载。以文件搜索为例: 给定一个目录及文件匹配表达式, 列出所有匹配的文件。 一个很相近的例子请参阅《Java虚拟机并发编程》第四章P38.


       1.   定义任务描述接口, 详细描述何种任务类型及任务内容, 以便于与相应执行时间对应起来, 了解不同任务的执行时间。        

package multithreading.customized;

public interface TaskInfo {

// 描述任务的具体内容, 便于线程池监控使用
public String desc();

}

 

      2.  任务的具体定义, 包括文件匹配任务和文件抽取任务, 分别对应两种思路。 

package multithreading.callable;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.regex.Pattern;

import multithreading.customized.TaskInfo;

public class FileMatchCallable implements Callable<List<File>>, TaskInfo {

private File directory; // 要匹配的非目录文件
private Pattern pattern; // 要匹配的文件名模式

public FileMatchCallable(File directory, Pattern pattern) {
this.directory = directory;
this.pattern = pattern;
}

public String desc() {
return "[FileMatchTask](" + "dir: " + directory.getAbsolutePath() + ", "
+ "pattern: " + pattern + ")";
}

public List<File> call() throws Exception {
List
<File> result = new ArrayList<File>();
if (directory.isFile()) {
boolean matched = pattern.matcher(directory.getName()).matches();
if (matched) {
result.add(directory);
}
}
return result;
}

}

 

package multithreading.callable;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

import multithreading.customized.TaskInfo;

public class FileExtractCallable implements Callable<List<File>>, TaskInfo {

private File directory;

public FileExtractCallable(File directory) {
this.directory = directory;
}

public String desc() {
return "[FileExtractTask](" + "dir: " + directory + ")";
}

public List<File> call() throws Exception {
List
<File> result = new ArrayList<File>();
extractFiles(directory, result);
return result;
}

private void extractFiles(File dir, List<File> extractedFiles) {
if (dir.isFile()) {
extractedFiles.add(dir);
}
else if (dir.isDirectory()) {
File[] files
= dir.listFiles();
for (File file: files) {
extractFiles(file, extractedFiles);
}
}
}
}

 

      3.  可暴露 Callable 的 FutureTask , 这是由于默认的 FutureTask 将传入的 Callable 保护的紧紧的。 在线程池中必须获取到FutureTask 里的 Callable 来得到相应的 TaskInfo. 也可以采用 ConcurrentMap<FutureTask, String> taskInfoCache 对 FutureTask 与相应的 TaskInfo 做映射, 但由于存在对 taskInfoCache 的并发读写会影响到正常服务, 因此, 采用扩展 FutureTask 的方式来获取 Callable . 

package multithreading.customized;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class FutureTaskWithCallableAvailed<T> extends FutureTask<T> {

private Callable<T> task;

public FutureTaskWithCallableAvailed(Callable<T> task) {
super(task);
this.task = task;
}

public Callable<T> getTask() {
return task;
}

}

 

    4.   可记录线程执行时间的线程池。 既然是框架,就应当最好在同一处进行处理, 而不要将相似的代码分布到各处。 可参考《Java并发编程实战》P149.

package multithreading;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import multithreading.customized.FutureTaskWithCallableAvailed;
import multithreading.customized.TaskInfo;

import org.apache.log4j.Logger;

public class MyThreadPoolExecutor {

private static final Logger log = Logger.getLogger("threadMonitor");

private static ConcurrentMap<String, MyThreadPoolExecutor> poolCache = new ConcurrentHashMap<String, MyThreadPoolExecutor>();

private static MyThreadPoolExecutor myThreadPool = null;

private int corePoolSize = 3; // 线程池维护线程的最小数量

private int maximumPoolSize = 5; // 线程池维护线程的最大数量

private long keepAliveTime = 60; // 线程池维护线程所允许的空闲时间

private final TimeUnit unit = TimeUnit.SECONDS; // 线程池维护线程所允许的空闲时间的单位

private BlockingQueue<Runnable> workQueue = null; // 线程池所使用的缓冲队列

private final RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); // 线程池对拒绝任务的处理策略

private ThreadPoolExecutor threadPool = null;

private MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, int timeoutQueueSize,
String namePrefix) {
this.corePoolSize = corePoolSize > 0 ? corePoolSize : this.corePoolSize;
this.maximumPoolSize = maximumPoolSize > 0 ? maximumPoolSize : this.maximumPoolSize;
this.keepAliveTime = keepAliveTime;
workQueue
= new ArrayBlockingQueue<Runnable>(timeoutQueueSize);
threadPool
= new MonitoredThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new MoonmmThreadFactory(namePrefix), handler);
}

/**
* 实例化线程池
*/
public static MyThreadPoolExecutor getInstance(int corePoolSize, int maximumPoolSize, long keepAliveTime,
int timeoutQueueSize, String namePrefix) {
if (poolCache.get(namePrefix) == null) {
myThreadPool
= new MyThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
timeoutQueueSize, namePrefix);
poolCache.put(namePrefix, myThreadPool);
}
return myThreadPool;
}

/**
* 通过线程池执行Runable
*/
public void execute(FutureTask<?> task) {
threadPool.execute(task);
}

/**
* 关闭所有线程
*/
public void shutdown() {
threadPool.shutdown();
}

/**
* 返回核心线程数
*
*
@return
*/
public int getCorePoolSize() {
return corePoolSize;
}

/**
* 返回最大线程数
*
*
@return
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}

/**
* 返回线程的最大空闲时间
*
*
@return
*/
public long getKeepAliveTime() {
return keepAliveTime;
}

/**
* 线程工厂类
*/
static class MoonmmThreadFactory implements ThreadFactory {

final AtomicInteger threadNumber = new AtomicInteger(1);
String namePrefix
= "";

public MoonmmThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}

public Thread newThread(Runnable r) {
Thread t
= new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(
false);
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

static class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {

public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue
<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();

protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.currentTimeMillis());
}

protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.currentTimeMillis();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
if (r instanceof FutureTaskWithCallableAvailed) {
Callable c
= ((FutureTaskWithCallableAvailed)r).getTask();
if (c instanceof TaskInfo) {
String taskInfo
= ((TaskInfo)c).desc();
log.info(String.format(
"Task %s: time=%dms", taskInfo, taskTime));
}
}
}
finally {
super.afterExecute(r, t);
}
}

public void logThreadPoolMonitorData() {
log.info(
"total tasks completed: " + numTasks.get());
log.info(
"totalTime: " + totalTime.get());
}

public Map<String, Object> obtainThreadPoolMonitorData() {
Map
<String, Object> monitorData = new HashMap<String, Object>();
monitorData.put(
"total_task", numTasks.get());
monitorData.put(
"total_time", totalTime.get());
return monitorData;
}

}

public static <T> List<T> getMultiTaskResult(List<FutureTask<List<T>>> futureTaskList) {
List
<T> results = new ArrayList<T>();
for (FutureTask<List<T>> futureTask : futureTaskList) {
try {
// 每个线程设置固定的执行时间,过期不候
List<T> partResultList = futureTask.get(ThreadConstants.TASK_WAIT_TIME, TimeUnit.SECONDS);
if (partResultList != null && partResultList.size() > 0) {
for (T file : partResultList) {
results.add(file);
}
}
}
catch (TimeoutException e) {
log.error(futureTask.getClass()
+ " Multi thread timeout error: " + Thread.currentThread().getName(),
e);
}
catch (InterruptedException e) {
log.error(futureTask.getClass()
+ " Multi thread interrupted error: "
+ Thread.currentThread().getName(), e);
}
catch (ExecutionException e) {
log.error(futureTask.getClass()
+ " Multi thread execution error: "
+ Thread.currentThread().getName(), e);
}
}
return results;
}

}

 

     5.  线程池的Spring 配置(日志配置见工程), 尽量使用成熟框架做一些基础工作。 对于 CPU 密集型和 IO 密集型的任务, 要使用不同的线程池配置。 CPU 密集型: 线程数 = 可用CPU+1, 任务数 >> 线程数; IO 密集型: 线程数 >> CPU数, 任务数与线程数相当。 

<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation
="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd"
>

<!-- thread pool -->
<bean id="ioThreadPoolExecutor"
class
="multithreading.MyThreadPoolExecutor"
factory-method
="getInstance">
<constructor-arg index="0" type="int">
<value>60</value>
</constructor-arg>
<constructor-arg index="1" type="int">
<value>60</value>
</constructor-arg>
<constructor-arg index="2" type="long">
<value>60</value>
</constructor-arg>
<constructor-arg index="3" type="int">
<value>40</value>
</constructor-arg>
<constructor-arg index="4" type="java.lang.String">
<value>iointensive-execute-thread</value>
</constructor-arg>
</bean>

<bean id="cpuThreadPoolExecutor"
class
="multithreading.MyThreadPoolExecutor"
factory-method
="getInstance">
<constructor-arg index="0" type="int">
<value>3</value>
</constructor-arg>
<constructor-arg index="1" type="int">
<value>3</value>
</constructor-arg>
<constructor-arg index="2" type="long">
<value>60</value>
</constructor-arg>
<constructor-arg index="3" type="int">
<value>120</value>
</constructor-arg>
<constructor-arg index="4" type="java.lang.String">
<value>cpuintensive-execute-thread</value>
</constructor-arg>
</bean>

<bean id="fileSearchService" class="multithreading.service.FileSearchService">
<property name="ioThreadPoolExecutor" ref="ioThreadPoolExecutor" />
<property name="cpuThreadPoolExecutor" ref="cpuThreadPoolExecutor" />
</bean>

</beans>


       6.  获取 Spring Bean 。 为了达到更好的代码可维护性, 尽量做到责任分配合理。

package appcontext;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class ApplicationContextUtil {

private static ClassPathXmlApplicationContext applicationContext = null;

private ApplicationContextUtil() {}

static {
if (applicationContext == null) {
applicationContext
= new ClassPathXmlApplicationContext("javathreading-spring-base.xml");
}
}

public static ClassPathXmlApplicationContext getApplicationContextInstance() {
if (applicationContext == null) {
return new ClassPathXmlApplicationContext("javathreading-spring-base");
}
return applicationContext;
}

public static Object getBean(String beanId) {
return applicationContext.getBean(beanId);
}

}

     

      7.   使用线程池。 一般在 Service 里完成, 使用 Spring 进行注入。 使用并发的主要目的之一是提高性能, 首先就要确定性能瓶颈, 必须通过测量来完成, 而不是靠臆测。  最开始, 想当然以为匹配所耗的时间最多, 但测量之后发现不是, 而是获取目录的文件列表。 其次, 判断任务类型, 是CPU密集型还是IO 密集型。 CPU 密集型通常不涉及IO 操作, 主要是内存计算; IO 密集型则涉及文件网络操作。 不同的任务类型所适宜使用的线程池配置是大不相同的。

package multithreading.service;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.FutureTask;
import java.util.regex.Pattern;

import multithreading.MyThreadPoolExecutor;
import multithreading.callable.FileExtractCallable;
import multithreading.callable.FileMatchCallable;
import multithreading.customized.FutureTaskWithCallableAvailed;

public class FileSearchService {

private MyThreadPoolExecutor ioThreadPoolExecutor;

private MyThreadPoolExecutor cpuThreadPoolExecutor;

public MyThreadPoolExecutor getIoThreadPoolExecutor() {
return ioThreadPoolExecutor;
}

public void setIoThreadPoolExecutor(MyThreadPoolExecutor ioThreadPoolExecutor) {
this.ioThreadPoolExecutor = ioThreadPoolExecutor;
}

public MyThreadPoolExecutor getCpuThreadPoolExecutor() {
return cpuThreadPoolExecutor;
}

public void setCpuThreadPoolExecutor(MyThreadPoolExecutor cpuThreadPoolExecutor) {
this.cpuThreadPoolExecutor = cpuThreadPoolExecutor;
}

/**
* 搜索匹配文件的串行版本
* 串行版本实现非常简单, 如果性能在接受范围内 应该选择串行版本
* 串行版本可以用于测量和定位性能瓶颈, 检验结果的正确性
* 运行结果表明:
* checkExistAndExtractAllFiles 耗时最大, Get bean 次之, Matching 最少, 因此并发版本一优化 matching time 是毫无意义的.
* Get bean time: 393ms.
* Extracting time: 1495ms.
* Matching time: 38ms.
* File SearchTask Time Cost: 1926ms.
*/
public List<File> searchMatchedFileSequentially(File targetDirectory, Pattern pattern) throws FileNotFoundException {

long start = System.currentTimeMillis();

// 抽取文件任务: IO 密集型
List<File> allExtractedFiles = checkExistAndExtractAllFileSequentially(targetDirectory);
long extractEnd = System.currentTimeMillis();

// 文件匹配任务: CPU 密集型
List<File> resultList = obtainMatchedFileSequentially(allExtractedFiles, pattern);
long matchedEnd = System.currentTimeMillis();

System.out.println(
"Extracting time: " + (extractEnd - start) + "ms.");
System.out.println(
"Matching time: " + (matchedEnd - extractEnd) + "ms.");
return resultList;
}

/**
* 并发搜索匹配文件版本一
* 串行抽取文件, 并发匹配文件, 创建线程开销很大, 没有找准优化目标
* File SearchTask Time Cost: 2884ms.
*/
public List<File> searchMatchedFileConcurrentlyFirst(File targetDirectory, Pattern pattern) throws FileNotFoundException {
List
<File> allExtractedFiles = checkExistAndExtractAllFileSequentially(targetDirectory);
return matchFileConcurrently(allExtractedFiles, pattern);
}

/**
* 并发搜索匹配文件版本二
* 并发抽取文件, 串行匹配文件, 有针对性地优化目标
* File SearchTask Time Cost: 1409ms.
*/
public List<File> searchMatchedFileConcurrentlySecond(File targetDirectory, Pattern pattern) throws FileNotFoundException {
List
<File> allExtractedFiles = checkExistAndExtractAllFileConcurrently(targetDirectory);
return obtainMatchedFileSequentially(allExtractedFiles, pattern);
}

// 串行版本: 检查是否存在待搜索目录并列出所有的非目录文件
private List<File> checkExistAndExtractAllFileSequentially(File targetDirectory) throws FileNotFoundException {

checkExist(targetDirectory);
List
<File> allExtractedFiles = new ArrayList<File>();
extractFileSequentially(targetDirectory, allExtractedFiles);
return allExtractedFiles;
}

// 并发版本: 检查是否存在待搜索目录并列出所有的非目录文件
private List<File> checkExistAndExtractAllFileConcurrently(File targetDirectory) throws FileNotFoundException {

checkExist(targetDirectory);
List
<File> allExtractedFiles = new ArrayList<File>();
extractFileConcurrently(targetDirectory, allExtractedFiles);
return allExtractedFiles;
}

// 并发版本: 列出所有的非目录文件
private List<File> extractFileConcurrently(File targetDirectory, List<File> allExtractedFiles) {
List
<FutureTask<List<File>>> futureTaskList = new ArrayList<FutureTask<List<File>>>();
if (targetDirectory.isFile()) {
allExtractedFiles.add(targetDirectory);
return allExtractedFiles;
}
else if (targetDirectory.isDirectory()){
File[] files
= targetDirectory.listFiles();
for (File file: files) {
FileExtractCallable fileExtractCallable
= new FileExtractCallable(file);
FutureTask
<List<File>> fileExtractFutureTask = new FutureTaskWithCallableAvailed<List<File>>(fileExtractCallable);
futureTaskList.add(fileExtractFutureTask);
ioThreadPoolExecutor.execute(fileExtractFutureTask);
}
}
return MyThreadPoolExecutor.getMultiTaskResult(futureTaskList);
}

// 并发版本: 获取匹配的文件集合
private List<File> matchFileConcurrently(List<File> allExtractedFiles, Pattern pattern) {
List
<FutureTask<List<File>>> futureTaskList = new ArrayList<FutureTask<List<File>>>();
if (allExtractedFiles.size() > 0) {
for (File file: allExtractedFiles) {
FileMatchCallable searchFileCallable
= new FileMatchCallable(file, pattern);
FutureTask
<List<File>> searchFileFutureTask = new FutureTaskWithCallableAvailed<List<File>>(searchFileCallable);
futureTaskList.add(searchFileFutureTask);
cpuThreadPoolExecutor.execute(searchFileFutureTask);
}
}
return MyThreadPoolExecutor.getMultiTaskResult(futureTaskList);
}

// 检查是否存在待搜索目录
private void checkExist(File targetDirectory) throws FileNotFoundException
{
if (!targetDirectory.exists()) {
throw new FileNotFoundException("No such directory: " + targetDirectory);
}
}

// 串行版本: 列出所有的非目录文件
private void extractFileSequentially(File directory, List<File> allExtractedFiles) {
if (directory.isFile()) {
allExtractedFiles.add(directory);
}
else if (directory.isDirectory()) {
File[] files
= directory.listFiles();
for (File dir: files) {
extractFileSequentially(dir, allExtractedFiles);
}
}
}

// 串行版本: 获取匹配的文件集合
private List<File> obtainMatchedFileSequentially(List<File> allExtractedFiles, Pattern pattern) {
List
<File> resultList = new ArrayList<File>();
for (File file: allExtractedFiles) {
boolean matched = pattern.matcher(file.getName()).matches();
if (matched) {
resultList.add(file);
}
}
return resultList;
}



}


       8.  客户端使用。 接口做好了, 客户端使用就容易了。

package multithreading.tasks;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;

import multithreading.service.FileSearchService;
import timecost.CommandIF;
import timecost.TimeMeasurer;
import timecost.TimeMeasurerIF;
import appcontext.ApplicationContextUtil;

public class FileSearchTask implements CommandIF {

String directory
= "D:\\csat";
String pattern
= "[\\w]+.java";

public static void main(String[] args) {
TimeMeasurerIF timeMeasurer
= new TimeMeasurer(new FileSearchTask());
long taskTime = timeMeasurer.timecost();
System.out.println(
"File SearchTask Time Cost: " + taskTime + "ms.");
}

public void runTask() {

long start = System.currentTimeMillis();
Pattern patt
= Pattern.compile(pattern);
FileSearchService fileSearchService
= (FileSearchService) ApplicationContextUtil.getBean("fileSearchService");
long getBeanEnd = System.currentTimeMillis();
System.out.println(
"Get bean time: " + (getBeanEnd - start) + "ms.");

List
<File> fileListFound = new ArrayList<File>();
try {
fileListFound
= fileSearchService.searchMatchedFileConcurrentlySecond(new File(directory), patt);
//fileListFound = fileSearchService.searchMatchedFileConcurrentlyFirst(new File(directory), patt);
//fileListFound = fileSearchService.searchMatchedFileSequentially(new File(directory), patt);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
}
if (fileListFound != null && fileListFound.size() > 0) {
// for (File file: fileListFound) {
// System.out.println("Files Found: " + file.getAbsolutePath());
// }
System.out.println("Total Num Found: " + fileListFound.size());
}
}

}


       9.  计时相关(命令模式)。 实际上, 这种方式还是不够灵活, 当我需要在方法中对任意可能的方法进行测量时, 就不得不将 System.currentMillis 遍布在代码中, 使代码很难看。 初步考虑是否可以使用注解的方式来实现, 即在需要测量时间的方法上加上注解  @timemeasure , 就可以自动地打印该方法的运行时间。

package timecost;

public interface CommandIF {

public void runTask();

}
package timecost;

public interface TimeMeasurerIF {

long timecost();

}
package timecost;

public class TimeMeasurer implements TimeMeasurerIF {

private CommandIF command;

public TimeMeasurer(CommandIF command) {
this.command = command;
}

public long timecost() {
long start = System.currentTimeMillis();
command.runTask();
long end = System.currentTimeMillis();
return end - start;
}

}