初步了解fork/join框架
fork/join 框架是java7中加入的一個并行任務(wù)框架,可以將任務(wù)分割成足夠小的小任務(wù),然后讓不同的線程來做這些分割出來的小事情,然后完成之后再進(jìn)行join,將小任務(wù)的結(jié)果組裝成大任務(wù)的結(jié)果。下面的圖片展示了這種框架的工作模型:
使用fork/join并行框架的前提是我們的任務(wù)可以拆分成足夠小的任務(wù),而且可以根據(jù)小任務(wù)的結(jié)果來組裝出大任務(wù)的結(jié)果,一個最簡單的例子是使用fork/join框架來求一個數(shù)組中的最大/最小值,這個任務(wù)就可以拆成很多小任務(wù),大任務(wù)就是尋找一個大數(shù)組中的最大/最小值,我們可以將一個大數(shù)組拆成很多小數(shù)組,然后分別求解每個小數(shù)組中的最大/最小值,然后根據(jù)這些任務(wù)的結(jié)果組裝出最后的最大最小值,下面的代碼展示了如何通過fork/join求解數(shù)組的最大值:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
import java.util.concurrent.executionexception; import java.util.concurrent.forkjoinpool; import java.util.concurrent.future; import java.util.concurrent.recursivetask; import java.util.concurrent.timeunit; import java.util.concurrent.timeoutexception; /** * created by hujian06 on 2017/9/28. * * fork/join demo */ public class forkjoindemo { /** * how to find the max number in array by fork/join */ private static class maxnumber extends recursivetask<integer> { private int threshold = 2 ; private int [] array; // the data array private int index0 = 0 ; private int index1 = 0 ; public maxnumber( int [] array, int index0, int index1) { this .array = array; this .index0 = index0; this .index1 = index1; } @override protected integer compute() { int max = integer.min_value; if ((index1 - index0) <= threshold) { for ( int i = index0;i <= index1; i ++) { max = math.max(max, array[i]); } } else { //fork/join int mid = index0 + (index1 - index0) / 2 ; maxnumber lmax = new maxnumber(array, index0, mid); maxnumber rmax = new maxnumber(array, mid + 1 , index1); lmax.fork(); rmax.fork(); int lm = lmax.join(); int rm = rmax.join(); max = math.max(lm, rm); } return max; } } public static void main(string ... args) throws executionexception, interruptedexception, timeoutexception { forkjoinpool pool = new forkjoinpool(); int [] array = { 100 , 400 , 200 , 90 , 80 , 300 , 600 , 10 , 20 ,- 10 , 30 , 2000 , 1000 }; maxnumber task = new maxnumber(array, 0 , array.length - 1 ); future<integer> future = pool.submit(task); system.out.println( "result:" + future.get( 1 , timeunit.seconds)); } } |
可以通過設(shè)置不同的閾值來拆分成小任務(wù),閾值越小代表拆出來的小任務(wù)越多。
工作竊取算法
fork/join在實現(xiàn)上,大任務(wù)拆分出來的小任務(wù)會被分發(fā)到不同的隊列里面,每一個隊列都會用一個線程來消費(fèi),這是為了獲取任務(wù)時的多線程競爭,但是某些線程會提前消費(fèi)完自己的隊列。而有些線程沒有及時消費(fèi)完隊列,這個時候,完成了任務(wù)的線程就會去竊取那些沒有消費(fèi)完成的線程的任務(wù)隊列,為了減少線程競爭,fork/join使用雙端隊列來存取小任務(wù),分配給這個隊列的線程會一直從頭取得一個任務(wù)然后執(zhí)行,而竊取線程總是從隊列的尾端拉取task。
frok/join框架的實現(xiàn)細(xì)節(jié)
在上面的示例代碼中,我們發(fā)現(xiàn)fork/join的任務(wù)是通過forkjoinpool來執(zhí)行的,所以框架的一個核心是任務(wù)的fork和join,然后就是這個forkjoinpool。關(guān)于任務(wù)的fork和join,我們可以想象,而且也是由我們的代碼自己控制的,所以要分析fork/join,那么forkjoinpool最值得研究。
上面的圖片展示了forkjoinpool的類關(guān)系圖,可以看到本質(zhì)上它就是一個executor。在forkjoinpool里面,有兩個特別重要的成員如下:
1
2
|
volatile workqueue[] workqueues; final forkjoinworkerthreadfactory factory; |
workqueues 用于保存向forkjoinpool提交的任務(wù),而具體的執(zhí)行有forkjoinworkerthread執(zhí)行,而forkjoinworkerthreadfactory可以用于生產(chǎn)出forkjoinworkerthread。可以看一些forkjoinworkerthread,可以發(fā)現(xiàn)每一個forkjoinworkerthread會有一個pool和一個workqueue,和我們上面描述的是一致的,每個線程都被分配了一個任務(wù)隊列,而執(zhí)行這個任務(wù)隊列的線程由pool提供。
下面我們看一下當(dāng)我們fork的時候發(fā)生了什么:
1
2
3
4
5
6
7
8
|
public final forkjointask<v> fork() { thread t; if ((t = thread.currentthread()) instanceof forkjoinworkerthread) ((forkjoinworkerthread)t).workqueue.push( this ); else forkjoinpool.common.externalpush( this ); return this ; } |
看上面的fork代碼,可以看到首先取到了當(dāng)前線程,然后判斷是否是我們的forkjoinpool專用線程,如果是,則強(qiáng)制類型轉(zhuǎn)換(向下轉(zhuǎn)換)成forkjoinworkerthread,然后將任務(wù)push到這個線程負(fù)責(zé)的隊列里面去。如果當(dāng)前線程不是forkjoinworkerthread類型的線程,那么就會走else之后的邏輯,大概的意思是首先嘗試將任務(wù)提交給當(dāng)前線程,如果不成功,則使用例外的處理方法,關(guān)于底層實現(xiàn)較為復(fù)雜,和我們使用fork/join關(guān)系也不太大,如果希望搞明白具體原理,可以看源碼。
下面看一下join的流程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
public final v join() { int s; if ((s = dojoin() & done_mask) != normal) reportexception(s); return getrawresult(); } private int dojoin() { int s; thread t; forkjoinworkerthread wt; forkjoinpool.workqueue w; return (s = status) < 0 ? s : ((t = thread.currentthread()) instanceof forkjoinworkerthread) ? (w = (wt = (forkjoinworkerthread)t).workqueue). tryunpush( this ) && (s = doexec()) < 0 ? s : wt.pool.awaitjoin(w, this , 0l) : externalawaitdone(); } final int doexec() { int s; boolean completed; if ((s = status) >= 0 ) { try { completed = exec(); } catch (throwable rex) { return setexceptionalcompletion(rex); } if (completed) s = setcompletion(normal); } return s; } /** * implements execution conventions for recursivetask. */ protected final boolean exec() { result = compute(); return true ; } |
上面展示了主要的調(diào)用鏈路,我們發(fā)現(xiàn)最后落到了我們在代碼里編寫的compute方法,也就是執(zhí)行它,所以,我們需要知道的一點是,fork僅僅是分割任務(wù),只有當(dāng)我們執(zhí)行join的時候,我們的額任務(wù)才會被執(zhí)行。
如何使用fork/join并行框架
前文首先展示了一個求數(shù)組中最大值得例子,然后介紹了“工作竊取算法”,然后分析了fork/join框架的一些細(xì)節(jié),下面才是我們最關(guān)心的,怎么使用fork/join框架呢?
為了使用fork/join框架,我們只需要繼承類recursivetask或者recursiveaction。前者適用于有返回值的場景,而后者適合于沒有返回值的場景。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:http://www.jianshu.com/p/ac9e175662ca