1.簡介
使用線程池可以避免線程的頻繁創建以及銷毀。
java中提供的用于實現線程池的api:
executor、executorservice、abstractexecutorservice、threadpoolexecutor、forkjoinpool都位于java.util.concurrent包下。
*threadpoolexecutor、forkjoinpool為線程池的實現類。
2.executor
1
2
3
4
5
6
7
8
|
public interface executor { /** * 向線程池提交一個任務,交由線程池去執行 */ void execute(runnable command); } |
*該接口聲明了execute(runnable command)方法,負責向線程池中提交一個任務。
3.executorservice接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
public interface executorservice extends executor { /** * 關閉線程池(等待隊列中的任務被執行完畢) */ void shutdown(); /** * 立刻關閉線程池(不執行隊列中的任務,并嘗試中斷當前執行的任務) */ list<runnable> shutdownnow(); /** * 判斷線程池是否處于shutdown狀態. */ boolean isshutdown(); /** * 判斷線程池是否處于terminated狀態. */ boolean isterminated(); /** * 若在指定時間內線程池處于terminated狀態則立即返回true,否則超過時間后仍未為terminated狀態則返回false. */ boolean awaittermination( long timeout, timeunit unit) throws interruptedexception; /** * 向線程池提交一個任務并返回包含指定類型的future(根據callable的泛型) */ <t> future<t> submit(callable<t> task); /** * 向線程池提交一個任務并指定任務執行結果的類型,返回包含指定類型的future. */ <t> future<t> submit(runnable task, t result); /** * 向線程池提交一個任務并返回未知類型的future. */ future<?> submit(runnable task); /** * 向線程池提交多個任務并返回指定類型的future列表. */ <t> list<future<t>> invokeall(collection<? extends callable<t>> tasks) throws interruptedexception; /** * 向線程池提交多個任務并返回指定類型的future列表,如果在指定時間內沒有執行完畢則直接返回. */ <t> list<future<t>> invokeall(collection<? extends callable<t>> tasks, long timeout, timeunit unit) throws interruptedexception; /** * 向線程池提交多個任務,當任意一個任務執行完畢后返回指定類型的future. */ <t> t invokeany(collection<? extends callable<t>> tasks) throws interruptedexception, executionexception; /** * 向線程池提交多個任務,在指定時間內,當任意一個任務執行完畢后返回指定類型的future,若超時則拋出異常. */ <t> t invokeany(collection<? extends callable<t>> tasks, long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception; } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public interface future<v> { /** * 中斷任務的執行 */ boolean cancel( boolean mayinterruptifrunning); /** * 判斷任務是否中斷成功 */ boolean iscancelled(); /** * 判斷任務是否執行完成 */ boolean isdone(); /** * 獲取任務的執行結果直到任務執行完畢(阻塞線程) */ v get() throws interruptedexception, executionexception; /** * 獲取任務的執行結果,若在指定時間內任務仍然沒有執行完畢則拋出timeoutexception */ v get( long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception; } |
*execute()方法不能獲取任務的執行結果,而submit()方法能夠根據返回的future實例獲取任務的執行結果。
4.threadpoolexecutor
corepoolsize:線程池中核心線程的數量。
maximumpoolsize:線程池中最大線程數。
keepalivetime:線程的空閑時間。
unit:修飾線程空閑時間的單位。
workqueue:任務隊列。
threadfactory:線程工廠,用于創建線程。
handler:當隊列已滿且當前線程數已達到所允許的最大值時的處理策略。
*線程池中的線程包括核心線程以及普通線程,核心線程一旦創建后直到線程池被關閉前都就不會被銷毀,而普通線程會因為到達空閑時間而被銷毀。
構造方法:
1
2
3
4
5
6
7
|
public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) |
blockingqueue的類型
blockingqueue提供了arrayblockingqueue、linkedblockingqueue、synchronousqueue等實現類。
1.arrayblockingqueue:使用順序表的結構進行存儲,在使用時需要指定其長度,支持公平鎖/非公平鎖進行操作。
2.linkedblockingqueue:使用鏈表的結構進行存儲,在使用時不需要指定其長度,隊列的最大長度為integer.max_value。
3.synchronousqueue:一個不存儲元素的隊列,每一個put操作必須等待take操作,否則不能添加元素,支持公平鎖和非公平鎖。
*這些實現類在進行入隊和出隊操作時都會進行加鎖,以保證在多線程并發訪問時數據的安全性。
隊列已滿且線程數已達到所允許的最大值時的處理策略
rejectedexecutionhandler提供了abortpolicy、discardpolicy、discardolderstpolicy、callerrunspolicy四個策略,這四個策略都是threadpoolexecutor的靜態內部類。
1.abortpolicy:放棄任務并拋出rejectedexecutionexception異常。
2.discardpolicy:放棄任務但不拋出異常。
3.discardolderstpolicy: 放棄隊頭中的任務,然后重新嘗試執行新任務。
4.callerrunspolicy: 由調用線程來處理該任務。
線程池的狀態
1
2
3
4
5
|
private static final int running = - 1 ; private static final int shutdown = 0 ; private static final int stop = 1 ; private static final int tidying = 2 ; private static final int terminated = 3 ; |
1.runing:線程池處于運行狀態,此時可以接受新的任務請求,并且執行隊列中的任務。
2.shutdown:線程池處于關閉狀態,此時不接受新的任務請求,但會繼續執行隊列中的任務。
3.stop:線程池處于禁用狀態,此時不接受新的任務請求,并且不會執行隊列中的任務。
4.tidying:線程池處于整理狀態,此時沒有正在執行的任務。
5.terminated :線程池處于終止狀態。
線程池狀態的變化過程
1.當線程池創建后處于running狀態。
2.1 若此時調用了shutdown()方法,那么線程池將處于shutdown狀態,不接受新的任務請求,但會繼續執行隊列中的任務,當隊列中的任務為空且沒有正在執行的任務時,線程池的狀態為tidying。
2.2 若此時調用了shutdownnow()方法,那么線程池將處于stop狀態,不接受新的任務請求并且不執行隊列中的任務,此時線程池的狀態為tidying。
3.當線程池的狀態為tidying時,當terminated()方法處理完畢后,線程池的狀態為trrminated。
任務的執行流程
1.當調用了execute()或者submit()方法向線程池提交一個任務后,首先判斷當前線程池中的線程個數是否大于核心線程數。
2.如果當前線程池的線程個數小于核心線程數,則創建一個核心線程來處理任務。
3.如果當前線程池的線程個數大于核心線程數,則將任務放入到隊列中,如果放入隊列成功,那么該任務將等待被空閑的線程處理,如果放入隊列失敗(隊滿),則判斷當前線程池中的線程個數是否達到所允許的最大值,若未達到則創建一個普通線程去處理任務,否則根據預定義的處理策略去進行處理。
5.executors工具類
java中提供了executors工具類,用于直接創建executor。
cachethreadpool
1
2
3
|
public static executorservice newcachedthreadpool() { return new threadpoolexecutor( 0 , integer.max_value,60l, timeunit.seconds, new synchronousqueue<runnable>()); } |
cachethreadpool創建的都是普通線程(其核心線程數為0)、線程池的最大線程數為integer.max_value、線程的空閑時間為60秒,此方式適合大量耗時短的任務、不適合大量耗時長的任務。
*由于創建的都是普通線程,且空閑時間為60秒,則仍有可能會頻繁的創建線程。
fixedthreadpool
1
2
3
|
public static executorservice newfixedthreadpool( int nthreads) { return new threadpoolexecutor(nthreads, nthreads,0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); } |
fixedthreadpool創建的都是核心線程,其線程個數由入參決定,線程不會因為空閑時間而被銷毀,適合預知任務數量的業務。
singlethreadexecutor
1
2
3
|
public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice( new threadpoolexecutor( 1 , 1 , new linkedblockingqueue<runnable>())); } |
singlethreadexecutor使用一個核心線程來處理任務。
scheduledthreadpool
1
2
3
|
public static scheduledexecutorservice newscheduledthreadpool( int corepoolsize) { return new scheduledthreadpoolexecutor(corepoolsize); } |
*scheduledthreadpool支持定時執行任務以及固定間隔執行任務。
singlethreadscheduledexecutor
1
2
3
|
public static scheduledexecutorservice newsinglethreadscheduledexecutor() { return new delegatedscheduledexecutorservice( new scheduledthreadpoolexecutor( 1 )); } |
*singlethreadscheduledexecutor支持一個線程的定時執行任務以及固定間隔執行任務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public interface scheduledexecutorservice extends executorservice { /** * 在指定的延遲時間到達后執行任務一次 */ public scheduledfuture<?> schedule(runnable command, long delay, timeunit unit); /** * 在指定的延遲時間到達后執行任務一次 */ public <v> scheduledfuture<v> schedule(callable<v> callable, long delay, timeunit unit); /** * 在指定的初始化延遲時間到達后執行任務一次,往后每隔period時間執行任務一次. */ public scheduledfuture<?> scheduleatfixedrate(runnable command, long initialdelay, long period,timeunit unit); /** * 在指定的初始化延遲時間到達后執行任務一次,往后每次任務執行完畢后相隔delay時間執行任務一次. */ public scheduledfuture<?> schedulewithfixeddelay(runnable command, long initialdelay, long delay,timeunit unit); } |
workstealingpool
1
2
3
|
public static executorservice newworkstealingpool( int parallelism) { return new forkjoinpool(parallelism,forkjoinpool.defaultforkjoinworkerthreadfactory, null , true ); } |
workstealingpool創建一個并行級別的線程池,同一時刻最多只能有指定個數個線程正在執行任務,創建時直接指定同一時刻最多能允許的并行執行的線程個數即可,如果不傳則使用cpu的核數。
newworkstealingpool方法內部返回一個forkjoinpool實例,forkjoinpool是java7新提供的線程池,同樣繼承abstactexecutorservice。
*作用類似于semaphore。
以上所述是小編給大家介紹的java中的線程池詳解整合,希望對大家有所幫助,如果大家有任何疑問請給我留言,小編會及時回復大家的。在此也非常感謝大家對服務器之家網站的支持!
原文鏈接:https://www.cnblogs.com/funyoung/p/10530986.html