【翻译二十一】java-并发之分拆和合并

时间:2021-06-16 09:56:04

Fork/Join


This section was updated to reflect features and conventions of the upcoming Java SE 8 release. You can download the current JDK 8 snapshot from java.net.

The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.

As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.

The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class.ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.

Basic Use

The first step for using the fork/join framework is to write code that performs a segment of the work. Your code should look similar to the following pseudocode:

if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results

Wrap this code in a ForkJoinTask subclass, typically using one of its more specialized types, either RecursiveTask (which can return a result) or RecursiveAction.

After your ForkJoinTask subclass is ready, create the object that represents all the work to be done and pass it to theinvoke() method of a ForkJoinPool instance.

Blurring for Clarity

To help you understand how the fork/join framework works, consider the following example. Suppose that you want to blur an image. The original source image is represented by an array of integers, where each integer contains the color values for a single pixel. The blurred destination image is also represented by an integer array with the same size as the source.

Performing the blur is accomplished by working through the source array one pixel at a time. Each pixel is averaged with its surrounding pixels (the red, green, and blue components are averaged), and the result is placed in the destination array. Since an image is a large array, this process can take a long time. You can take advantage of concurrent processing on multiprocessor systems by implementing the algorithm using the fork/join framework. Here is one possible implementation:

public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination; // Processing window size; should be odd.
private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
} protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length - 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
} // Reassemble destination pixel.
int dpixel = (0xff000000 ) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
} ...

Now you implement the abstract compute() method, which either performs the blur directly or splits it into two smaller tasks. A simple array length threshold helps determine whether the work is performed or split.

protected static int sThreshold = 100000;

protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
} int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}

If the previous methods are in a subclass of the RecursiveAction class, then setting up the task to run in a ForkJoinPool is straightforward, and involves the following steps:

  1. Create a task that represents all of the work to be done.

    // source image pixels are in src
    // destination image pixels are in dst
    ForkBlur fb = new ForkBlur(src, 0, src.length, dst);
  2. Create the ForkJoinPool that will run the task.

    ForkJoinPool pool = new ForkJoinPool();
    
  3. Run the task.

    pool.invoke(fb);
    

For the full source code, including some extra code that creates the destination image file, see the ForkBlur example.

Standard Implementations

Besides using the fork/join framework to implement custom algorithms for tasks to be performed concurrently on a multiprocessor system (such as the ForkBlur.java example in the previous section), there are some generally useful features in Java SE which are already implemented using the fork/join framework. One such implementation, introduced in Java SE 8, is used by the java.util.Arrays class for its parallelSort() methods. These methods are similar to sort(), but leverage concurrency via the fork/join framework. Parallel sorting of large arrays is faster than sequential sorting when run on multiprocessor systems. However, how exactly the fork/join framework is leveraged by these methods is outside the scope of the Java Tutorials. For this information, see the Java API documentation.

Another implementation of the fork/join framework is used by methods in the java.util.streams package, which is part ofProject Lambda scheduled for the Java SE 8 release. For more information, see the Lambda Expressions section.

« Previous • Trail • Next »

译文:
分拆和合并


  这节是即将发布的Java Se 8中的反映的新特性。你能从java.net下载目前的Jdk 8 snapshot。


  分拆与合并框架是ExecutorService接口的实现帮助你有效利用多处理器。 他被设计为能够被打断成为细小的一片片的。这个目标是利用好处理器的所有能力来加强你的应用程序。
  像任何ExecutorService的应用一样,分拆和合并框架也是利用任务的工作线程的线程池。分拆和合并是唯一的因为它是利用的work-stealing算法。工作线程能够偷走任务从正在执行的其他线程哪里。
  分拆和合并框架的中心是ForkJoinPool类,它是AbstractExecutorService类的扩展。ForkJoinPool实现了work-stealing算法的核心并能执行ForkJoinTask进程。
基本使用
  用分拆和合并框架的第一步是写代码执行的工作段。你的代码可能像下面的伪码:

if (my portion of the work is small enough)
do the work directly
else
split my work into two pieces
invoke the two pieces and wait for the results

  把这段代码放在ForkJoinTask子类,通常使用更专业的内容之一,无论RecursiveTask(能够返回一个值)或者RecursiveAction.
  在你的ForkJoinTask子类准备完之后,创建能代表所有要做的工作的对象,并把它传递到ForkJoinPool的实例的invoke()方法中。
为清楚的模糊(真费解!)
  为了让你理解分拆和合并框架的工作原理,考虑如下的例子。假如你想涂抹一幅图画,源图像被一个整数数组所表示,每一个整数包含颜色值代表一个像素。涂抹的目标图像也是用一个整数数组和原数组有相同的长度的数组所代表。执行涂抹是从源数组对象开始的。每一个像素代表了它周围的像素(红,绿,蓝的平均值),并且这个结果被放到了目标数组中。由于图片是一个大的数组,所以需要大量的时间来处理。你可以从多处理器通过实现分拆和合并框架算法实现的并发处理获益。这是一种实现:

public class ForkBlur extends RecursiveAction {
private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination; // Processing window size; should be odd.
private int mBlurWidth = 15; public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
} protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0),
mSource.length - 1);
int pixel = mSource[mindex];
rt += (float)((pixel & 0x00ff0000) >> 16)
/ mBlurWidth;
gt += (float)((pixel & 0x0000ff00) >> 8)
/ mBlurWidth;
bt += (float)((pixel & 0x000000ff) >> 0)
/ mBlurWidth;
} // Reassemble destination pixel.
int dpixel = (0xff000000 ) |
(((int)rt) << 16) |
(((int)gt) << 8) |
(((int)bt) << 0);
mDestination[index] = dpixel;
}
} ...

  

  现在你实现抽象的compute()方法,它在每次执行的时候都可以直接或者分开成两个小的任务。一个简单的数组长度阈值帮你确定是直接执行还是分割。

protected static int sThreshold = 100000;

protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
} int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
}

  如果之前的方法已经在RecursiveAction子类方法中,那么设置任务运行ForkJoinPool是直接的,

  使用如下的方法就行:
  1.创建一个代表所有需要做的任务。
  2.创建能够运行这个任务的ForkJoinPool。
  3.运行这个任务。
  对于所有的源码,包括一些额外的源码创建目标图像,看ForkBlur实例。

import java.awt.image.BufferedImage;
import java.io.File;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import javax.imageio.ImageIO; /**
* ForkBlur implements a simple horizontal image blur. It averages pixels in the
* source array and writes them to a destination array. The sThreshold value
* determines whether the blurring will be performed directly or split into two
* tasks.
*
* This is not the recommended way to blur images; it is only intended to
* illustrate the use of the Fork/Join framework.
*/
public class ForkBlur extends RecursiveAction { private int[] mSource;
private int mStart;
private int mLength;
private int[] mDestination;
private int mBlurWidth = 15; // Processing window size, should be odd. public ForkBlur(int[] src, int start, int length, int[] dst) {
mSource = src;
mStart = start;
mLength = length;
mDestination = dst;
} // Average pixels from source, write results into destination.
protected void computeDirectly() {
int sidePixels = (mBlurWidth - 1) / 2;
for (int index = mStart; index < mStart + mLength; index++) {
// Calculate average.
float rt = 0, gt = 0, bt = 0;
for (int mi = -sidePixels; mi <= sidePixels; mi++) {
int mindex = Math.min(Math.max(mi + index, 0), mSource.length - 1);
int pixel = mSource[mindex];
rt += (float) ((pixel & 0x00ff0000) >> 16) / mBlurWidth;
gt += (float) ((pixel & 0x0000ff00) >> 8) / mBlurWidth;
bt += (float) ((pixel & 0x000000ff) >> 0) / mBlurWidth;
} // Re-assemble destination pixel.
int dpixel = (0xff000000)
| (((int) rt) << 16)
| (((int) gt) << 8)
| (((int) bt) << 0);
mDestination[index] = dpixel;
}
}
protected static int sThreshold = 10000; @Override
protected void compute() {
if (mLength < sThreshold) {
computeDirectly();
return;
} int split = mLength / 2; invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
new ForkBlur(mSource, mStart + split, mLength - split,
mDestination));
} // Plumbing follows.
public static void main(String[] args) throws Exception {
String srcName = "red-tulips.jpg";
File srcFile = new File(srcName);
BufferedImage image = ImageIO.read(srcFile); System.out.println("Source image: " + srcName); BufferedImage blurredImage = blur(image); String dstName = "blurred-tulips.jpg";
File dstFile = new File(dstName);
ImageIO.write(blurredImage, "jpg", dstFile); System.out.println("Output image: " + dstName); } public static BufferedImage blur(BufferedImage srcImage) {
int w = srcImage.getWidth();
int h = srcImage.getHeight(); int[] src = srcImage.getRGB(0, 0, w, h, null, 0, w);
int[] dst = new int[src.length]; System.out.println("Array size is " + src.length);
System.out.println("Threshold is " + sThreshold); int processors = Runtime.getRuntime().availableProcessors();
System.out.println(Integer.toString(processors) + " processor"
+ (processors != 1 ? "s are " : " is ")
+ "available"); ForkBlur fb = new ForkBlur(src, 0, src.length, dst); ForkJoinPool pool = new ForkJoinPool(); long startTime = System.currentTimeMillis();
pool.invoke(fb);
long endTime = System.currentTimeMillis(); System.out.println("Image blur took " + (endTime - startTime) +
" milliseconds."); BufferedImage dstImage =
new BufferedImage(w, h, BufferedImage.TYPE_INT_ARGB);
dstImage.setRGB(0, 0, w, h, dst, 0, w); return dstImage;
}
}

  

标准实现
  除了利用分拆和合并框架实现对于任务的客户算法来执行多处理器的并发任务(例如在之前提到的ForkBlur.java实例)。
这里有些通用的有用的特性在java SE中已经由分拆和合并框架实现了。一个这种实现,在java se8 中介绍的通过分拆和合并事项的并发。在多处理器中并行排序对于大型数组比顺序排序快。然而,分拆和合并框架是如何实现的超过java教程。想获得更多信息,可以看java api文档。
  另外实现了分拆和合并框架的方法是在java.util.streams包中的方法,它是在Java SE 8 发布的Project Lambda调度的一部分中。想获得更多信息,可以参看Lambda Expressions节。