Fork/Join框架是ExecutorService接口的一個實(shí)現(xiàn),通過它我們可以實(shí)現(xiàn)多進(jìn)程。Fork/Join可以用來將一個大任務(wù)遞歸的拆分為多個小任務(wù),目標(biāo)是充分利用所有的資源盡可能增強(qiáng)應(yīng)用的性能。
和任何ExecutorService接口的實(shí)現(xiàn)一樣,F(xiàn)ork/Join也會使用線程池來分布式的管理工作線程。Fork/Join框架的獨(dú)特之處在于它使用了work-stealing(工作竊取)算法。通過這個算法,工作線程在無事可做時可以竊取其它正在繁忙的線程的任務(wù)來執(zhí)行。
Fork/Join框架的核心是ForkJoinPool類,一個AbstractExecutorService類的子類。ForkJoinPool實(shí)現(xiàn)了核心的work-stealing算法并可以執(zhí)行ForkJoinTask處理。
基礎(chǔ)用法
使用Fork/Join框架的第一步是編寫執(zhí)行碎片任務(wù)的代碼。要編寫的代碼類似如下偽代碼:
1
2
3
4
5
|
if 任務(wù)足夠小: 直接執(zhí)行任務(wù) else: 將任務(wù)切成兩個小任務(wù) 執(zhí)行兩個小任務(wù)并等待結(jié)果 |
使用ForkJoinTask子類來封裝如上的代碼,通常會使用一些JDK提供的類,使用的有RecursiveTask(這個類會返回一個結(jié)果)和RecursiveAction兩個類。
在準(zhǔn)備好ForkJoinTask子類后,創(chuàng)建一個代表所有任務(wù)的對象,并將之傳遞給一個ForkJoinPool實(shí)例的invoke()方法。
由模糊到清晰
為了輔助理解Fork/Join框架是如何工作的,我們使用一個案例來進(jìn)行說明:比如對一張圖片進(jìn)行模糊處理。我們用一個整型數(shù)組表示圖片,其中的每個數(shù)值代表一個像素的顏色。被模糊的圖片也用一個同等長度的數(shù)組來表示。
執(zhí)行模糊是通過對代表圖片的每個像素進(jìn)行處理實(shí)現(xiàn)的。計算每個像素與其周圍像素的均值(紅黃藍(lán)三原色的均值),計算生成的結(jié)果數(shù)組就是模糊后的圖片。由于代表圖像的通常都是一個大數(shù)組,整個處理過程需要通常會需要很多時間。可以使用Fork/Join框架利用多處理器系統(tǒng)上的并發(fā)處理優(yōu)勢來進(jìn)行提速。下面是一個可能的實(shí)現(xiàn):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
package com.zhyea.robin; import java.util.concurrent.RecursiveAction; public class ForkBlur extends RecursiveAction { private int [] mSource; private int mStart; private int mLength; private int [] mDestination; // 處理窗口大小; 需要是一個奇數(shù). private int mBlurWidth = 15 ; public ForkBlur( int [] src, int start, int length, int [] dst) { mSource = src; mStart = start; mLength = length; mDestination = dst; } protected void computeDirectly() { int sidePixels = (mBlurWidth - 1 ) / 2 ; for ( int index = mStart; index < mStart + mLength; index++) { // 計算平均值. float rt = 0 , gt = 0 , bt = 0 ; for ( int mi = -sidePixels; mi <= sidePixels; mi++) { int mindex = Math.min(Math.max(mi + index, 0 ), mSource.length - 1 ); int pixel = mSource[mindex]; rt += ( float ) ((pixel & 0x00ff0000 ) >> 16 ) / mBlurWidth; gt += ( float ) ((pixel & 0x0000ff00 ) >> 8 ) / mBlurWidth; bt += ( float ) ((pixel & 0x000000ff ) >> 0 ) / mBlurWidth; } // 重組目標(biāo)像素. int dpixel = ( 0xff000000 ) | ((( int ) rt) << 16 ) | ((( int ) gt) << 8 ) | ((( int ) bt) << 0 ); mDestination[index] = dpixel; } } .... } |
現(xiàn)在實(shí)現(xiàn)抽象方法compute(),在這個方法中既實(shí)現(xiàn)了模糊操作,也實(shí)現(xiàn)了將一個任務(wù)拆分成兩個小任務(wù)。這里僅是簡單依據(jù)數(shù)組長度來決定是直接執(zhí)行任務(wù)還是將之拆分成兩個小任務(wù):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
protected static int sThreshold = 100000 ; protected void compute() { if (mLength < sThreshold) { computeDirectly(); return ; } int split = mLength / 2 ; invokeAll( new ForkBlur(mSource, mStart, split, mDestination), new ForkBlur(mSource, mStart + split, mLength - split, mDestination)); } |
因為上面這些方法的實(shí)現(xiàn)是定義在RecursiveAction的一個子類中,可以直接在一個ForkJoinPool中創(chuàng)建并運(yùn)行任務(wù)。具體步驟如下:
1. 創(chuàng)建一個代表要執(zhí)行的任務(wù)的對象:
1
2
3
|
// src 表示源圖片像素的數(shù)組 // dst 表示生成的圖片的像素 ForkBlur fb = new ForkBlur(src, 0 , src.length, dst); |
2. 創(chuàng)建一個運(yùn)行任務(wù)的ForkJoinPool實(shí)例:
ForkJoinPool pool = new ForkJoinPool();
3. 運(yùn)行任務(wù):
pool.invoke(fb);
在源代碼中還包含了一些創(chuàng)建目標(biāo)圖片的代碼。具體參考ForkBlur示例。
標(biāo)準(zhǔn)實(shí)現(xiàn)
要使用Fork/Join框架按自定義的算法在多核系統(tǒng)上執(zhí)行并發(fā)任務(wù)當(dāng)然需要實(shí)現(xiàn)自定義的類了(比如之前我們實(shí)現(xiàn)的ForkBlur類)。除此之外,在JavaSE中已經(jīng)在廣泛使用Fork/Join框架的一些特性了。比如Java8中的java.util.Arrays類的parallelSort()方法就使用了Fork/Join框架。具體可以參考Java API文檔。
Fork/Join框架的另一個實(shí)現(xiàn)在java.util.streams包下,這也是java8的Lambda特性的一部分。