為什么要用線程池?
諸如 web 服務器、數據庫服務器、文件服務器或郵件服務器之類的許多服務器應用程序都面臨處理來自某些遠程來源的大量短小的任務。請求以某種方式到達服務器,這種方式可能是通過網絡協議(例如 http、ftp 或 pop)、通過 jms 隊列或者可能通過輪詢數據庫。不管請求如何到達,服務器應用程序中經常出現的情況是:單個任務處理的時間很短而請求的數目卻是巨大的。
只有當任務都是同類型并且相互獨立時,線程池的性能才能達到最佳。如果將運行時間較長的與運行時間較短的任務混合在一起,那么除非線程池很大,否則將可能造成擁塞,如果提交的任務依賴于其他任務,那么除非線程池無線大,否則將可能造成死鎖。
例如饑餓死鎖:線程池中的任務需要無限等待一些必須由池中其他任務才能提供的資源或條件。
threadpoolexecutor的通用構造函數:(在調用完構造函數之后可以繼續定制threadpoolexecutor)
1
2
3
4
5
|
public threadpoolexecutor( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit,blockingqueue<runnable> workqueue,threadfactory threadfactory, rejectedexecutionhandler handler){ //... } |
飽和策略:
threadpoolexecutor允許提供一個blockingqueue來保存等待執行的任務。
當有界隊列被填滿后,飽和策略開始發揮作用。可以通過調用setrejectedexecutionhandler來修改。
中止是默認的飽和策略,該策略將拋出未檢查的rejectedexecutionexception,調用者可以捕獲這個異常,然后根據需求編寫自己的處理代碼。
調用者運行策略實現了一種調節機制,該策略既不會拋棄任務,也不會拋出異常,而是將某些任務回退到調用者,從而降低新任務的流量。
例如對于webserver,當線程池中的所有線程都被占用,并且工作隊列被填滿后,下一個任務在調用execute時在主線程中執行。
由于執行任務需要一定的時間,因此主線程至少在一段時間內不能提交任何任務,從而使得工作者線程有時間來處理完正在執行的任務。
在這期間,主線程不會調用accept,因此到達的請求將被保存在tcp層的隊列中而不是在應用程序的隊列中,如果持續過載,那么tcp層最終發現它的請求隊列被填滿,同樣會開始拋棄請求。
因此當服務器過載時,這種過載會逐漸向外蔓延開來---從線程池到工作隊列到應用程序再到tcp層,最終到達客戶端,導致服務器在高負載下實現一種平緩的性能降低。
1
|
exec.setrejectedexecutionhandler( new threadpoolexecutor.callerrunspolicy()); |
當工作隊列被填滿后,沒有預定于的飽和策略來阻塞execute。而通過semaphore來現在任務的到達率,可以實現。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
/** * 設置信號量的上界設置為線程池的大小加上可排隊任務的數量,控制正在執行和等待執行的任務數量。 */ public class boundedexecutor { private final executor exec; private final semaphore semaphore; public boundedexecutor(executor exec, int bound){ this .exec = exec; this .semaphore = new semaphore(bound); } public void submittask( final runnable task) throws interruptedexception{ semaphore.acquire(); try { exec.execute( new runnable(){ public void run(){ try { task.run(); } finally { semaphore.release(); } } }); } catch (rejectedexecutionexception e){ semaphore.release(); } } } |
線程工廠
線程池配置信息中可以定制線程工廠,在threadfactory中只定義了一個方法newthread,每當線程池需要創建一個新線程時都會調用這個方法。
1
2
3
|
public interface threadfactory{ thread newthread(runnable r); } |
1
2
3
4
5
6
7
8
9
10
11
12
13
|
// 示例:將一個特定于線程池的名字傳遞給mythread的構造函數,從而可以再線程轉儲和錯誤日志信息中區分來自不同線程池的線程。 public class mythreadfactory implements threadfactory{ private final string poolname; public mythreadfactory(string poolname){ this .poolname = poolname; } public thread newthread(runnable runnable){ return new mythread(runnable,poolname); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
// 示例:為線程指定名字,設置自定義uncaughtexceptionhandler向logger中寫入信息及維護一些統計信息以及在線程被創建或者終止時把調試消息寫入日志。 public class mythread extends thread{ public static final string default_name = "mythread" ; private static volatile boolean debuglifecycle = false ; private static final atomicinteger created = new atomicinteger(); private static final atomicinteger alive = new atomicinteger(); private static final logger log = logger.getanonymouslogger(); public mythread(runnable runnable){ this (runnable,default_name); } public mythread(runnable runnable, string defaultname) { super (runnable,defaultname + "-" + created.incrementandget()); setuncaughtexceptionhandler( new thread.uncaughtexceptionhandler() { @override public void uncaughtexception(thread t, throwable e) { log.log(level.severe, "uncaught in thread " + t.getname(), e); } }); } public void run(){ boolean debug = debuglifecycle; if (debug){ log.log(level.fine, "created " + getname()); } try { alive.incrementandget(); super .run(); } finally { alive.decrementandget(); if (debug){ log.log(level.fine, "exiting " + getname()); } } } } |
擴展threadpoolexecutor
在線程池完成關閉操作時調用terminated,也就是在所有任務都已經完成并且所有工作者線程也已經關閉后。terminated可以用來釋放executor在其生命周期里分配的各種資源,此外還可以執行發送通知、記錄日志或者收集finalize統計信息等操作。
示例:給線程池添加統計信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
/** * timingthreadpool中給出了一個自定義的線程池,通過beforeexecute、afterexecute、terminated等方法來添加日志記錄和統計信息收集。 * 為了測量任務的運行時間,beforeexecute必須記錄開始時間并把它保存到一個afterexecute可用訪問的地方。 * 因為這些方法將在執行任務的線程中調用,因此beforeexecute可以把值保存到一個threadlocal變量中。然后由afterexecute來取。 * 在timingthreadpool中使用了兩個atomiclong變量,分別用于記錄已處理的任務和總的處理時間,并通過包含平均任務時間的日志消息。 */ public class timingthreadpool extends threadpoolexecutor{ public timingthreadpool( int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) { super (corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue); } private final threadlocal< long > starttime = new threadlocal< long >(); private final logger log = logger.getlogger( "timingthreadpool" ); private final atomiclong numtasks = new atomiclong(); private final atomiclong totaltime = new atomiclong(); protected void beforeexecute(thread t,runnable r){ super .beforeexecute(t, r); log.fine(string.format( "thread %s: start %s" , t,r)); starttime.set(system.nanotime()); } protected void afterexecute(throwable t,runnable r){ try { long endtime = system.nanotime(); long tasktime = endtime - starttime.get(); numtasks.incrementandget(); totaltime.addandget(tasktime); log.fine(string.format( "thread %s: end %s, time=%dns" , t,r,tasktime)); } finally { super .afterexecute(r, t); } } protected void terminated(){ try { log.info(string.format( "terminated: avg time=%dns" , totaltime.get()/numtasks.get())); } finally { super .terminated(); } } } |
#筆記內容參考 《java并發編程實戰》
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:https://www.cnblogs.com/shanhm1991/p/9899720.html