Fork/Join
Java7提供了Fork/Join来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。
类图
Java7提供了ForkJoinPool来支持将一个任务拆分为多个小任务并行计算,再把多个小任务的结果合并成总的计算结果。ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。
ForkJoinPool(int n)创建一个包含n个并行线程的ForkJoinPool
ForkJoinPool()创建一个Runtime.availableProcessors()返回值个数的并行线程。
ForkJoinTask代表一个可以并行、合并的任务,是一个抽象类,它还有两个抽象子类:RecursiveAction和RecursiveTask。其中RecursiveTask代表有返回值的任务,而RecursiveAction代表没有返回值的任务。
RecursiveAction实例:
package org.github.lujiango; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit; class PrintTask extends RecursiveAction {
private static final long serialVersionUID = 1L;
private static final int threshold = 50;
private int start;
private int end; public PrintTask(int start, int end) {
this.start = start;
this.end = end;
} @Override
protected void compute() {
if (end - start < threshold) {
for (int i = start; i < end; i++) {
System.out.println(Thread.currentThread().getName() + " i: " + i);
}
} else {
int middle = (start + end) / 2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
left.fork();
right.fork();
}
} } public class Test20 { public static void main(String[] args) throws InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
pool.submit(new PrintTask(0, 300));
pool.awaitTermination(2, TimeUnit.SECONDS);
pool.shutdown();
} }
分解后的任务分别调用fork()方法开始并行执行。
RecursiveTask<T>实例:
如果大任务是有返回值的任务,则可以让任务继承RecursiveTask<T>,其中泛型参数T就代表了该任务的返回值类型。
package org.github.lujiango; import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask; class CalTask extends RecursiveTask<Integer> {
private static final long serialVersionUID = 1L;
private static final int threshold = 20;
private int[] arr;
private int start;
private int end; public CalTask(int[] arr, int start, int end) {
this.arr = arr;
this.start = start;
this.end = end;
} @Override
protected Integer compute() {
int sum = 0;
if (end - start < threshold) {
for (int i = start; i < end; i++) {
sum += arr[i];
}
return sum;
} else {
int middle = (start + end) / 2;
CalTask left = new CalTask(arr, start, middle);
CalTask right = new CalTask(arr, middle, end);
left.fork();
right.fork();
return left.join() + right.join();
}
} } public class Test21 { public static void main(String[] args) throws InterruptedException, ExecutionException {
int[] arr = new int[100];
Random random = new Random();
int total = 0;
for (int i = 0, len = arr.length; i < len; i++) {
int tmp = random.nextInt(20);
total += (arr[i] = tmp);
}
System.out.println(total); ForkJoinPool pool = new ForkJoinPool();
Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length));
System.out.println(future.get());
pool.shutdown(); } }