Executor框架使用Runnable 作為其基本的任務(wù)表示形式。Runnable是一種有局限性的抽象,然后可以寫入日志,或者共享的數(shù)據(jù)結(jié)構(gòu),但是他不能返回一個(gè)值。
許多任務(wù)實(shí)際上都是存在延遲計(jì)算的:執(zhí)行數(shù)據(jù)庫查詢,從網(wǎng)絡(luò)上獲取資源,或者某個(gè)復(fù)雜耗時(shí)的計(jì)算。對(duì)于這種任務(wù),Callable是一個(gè)更好的抽象,他能返回一個(gè)值,并可能拋出一個(gè)異常。Future表示一個(gè)任務(wù)的周期,并提供了相應(yīng)的方法來判斷是否已經(jīng)完成或者取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; } public interface Future<V> { /** * Attempts to cancel execution of this task. This attempt will * fail if the task has already completed, has already been cancelled, * or could not be cancelled for some other reason. If successful, * and this task has not started when <tt>cancel</tt> is called, * this task should never run. If the task has already started, * then the <tt>mayInterruptIfRunning</tt> parameter determines * whether the thread executing this task should be interrupted in * an attempt to stop the task. * * <p>After this method returns, subsequent calls to {@link #isDone} will * always return <tt>true</tt>. Subsequent calls to {@link #isCancelled} * will always return <tt>true</tt> if this method returned <tt>true</tt>. * * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this * task should be interrupted; otherwise, in-progress tasks are allowed * to complete * @return <tt>false</tt> if the task could not be cancelled, * typically because it has already completed normally; * <tt>true</tt> otherwise */ boolean cancel( boolean mayInterruptIfRunning); /** * Returns <tt>true</tt> if this task was cancelled before it completed * normally. * * @return <tt>true</tt> if this task was cancelled before it completed */ boolean isCancelled(); /** * Returns <tt>true</tt> if this task completed. * * Completion may be due to normal termination, an exception, or * cancellation -- in all of these cases, this method will return * <tt>true</tt>. * * @return <tt>true</tt> if this task completed */ boolean isDone(); /** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException; /** * Waits if necessary for at most the given time for the computation * to complete, and then retrieves its result, if available. * * @param timeout the maximum time to wait * @param unit the time unit of the timeout argument * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting * @throws TimeoutException if the wait timed out */ V get( long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } |
可以通過多種方法來創(chuàng)建一個(gè)Future來描述任務(wù)。ExecutorService中的submit方法接受一個(gè)Runnable或者Callable,然后返回一個(gè)Future來獲得任務(wù)的執(zhí)行結(jié)果或者取消任務(wù)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
/** * Submits a value-returning task for execution and returns a * Future representing the pending results of the task. The * Future's <tt>get</tt> method will return the task's result upon * successful completion. * * <p> * If you would like to immediately block waiting * for a task, you can use constructions of the form * <tt>result = exec.submit(aCallable).get();</tt> * * <p> Note: The {@link Executors} class includes a set of methods * that can convert some other common closure-like objects, * for example, {@link java.security.PrivilegedAction} to * {@link Callable} form so they can be submitted. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ <T> Future<T> submit(Callable<T> task); /** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's <tt>get</tt> method will * return the given result upon successful completion. * * @param task the task to submit * @param result the result to return * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ <T> Future<T> submit(Runnable task, T result); /** * Submits a Runnable task for execution and returns a Future * representing that task. The Future's <tt>get</tt> method will * return <tt>null</tt> upon <em>successful</em> completion. * * @param task the task to submit * @return a Future representing pending completion of the task * @throws RejectedExecutionException if the task cannot be * scheduled for execution * @throws NullPointerException if the task is null */ Future<?> submit(Runnable task); |
另外ThreadPoolExecutor中的newTaskFor(Callable<T> task) 可以返回一個(gè)FutureTask。
假設(shè)我們通過一個(gè)方法從遠(yuǎn)程獲取一些計(jì)算結(jié)果,假設(shè)方法是 List getDataFromRemote(),如果采用同步的方法,代碼大概是 List data = getDataFromRemote(),我們將一直等待getDataFromRemote返回,然后才能繼續(xù)后面的工作,這個(gè)函數(shù)是從遠(yuǎn)程獲取計(jì)算結(jié)果的,如果需要很長(zhǎng)時(shí)間,后面的代碼又和這個(gè)數(shù)據(jù)沒有什么關(guān)系的話,阻塞在那里就會(huì)浪費(fèi)很多時(shí)間。我們有什么辦法可以改進(jìn)呢???
能夠想到的辦法是調(diào)用函數(shù)后,立即返回,然后繼續(xù)執(zhí)行,等需要用數(shù)據(jù)的時(shí)候,再取或者等待這個(gè)數(shù)據(jù)。具體實(shí)現(xiàn)有兩種方式,一個(gè)是用Future,另一個(gè)是回調(diào)。
1
2
3
|
Future<List> future = getDataFromRemoteByFuture(); //do something.... List data = future.get(); |
可以看到我們返回的是一個(gè)Future對(duì)象,然后接著自己的處理后面通過future.get()來獲得我們想要的值。也就是說在執(zhí)行g(shù)etDataFromRemoteByFuture的時(shí)候,就已經(jīng)啟動(dòng)了對(duì)遠(yuǎn)程計(jì)算結(jié)果的獲取,同時(shí)自己的線程還繼續(xù)執(zhí)行不阻塞。知道獲取時(shí)候再拿數(shù)據(jù)就可以。看一下getDataFromRemoteByFuture的實(shí)現(xiàn):
1
2
3
4
5
6
7
8
9
|
private Future<List> getDataFromRemoteByFuture() { return threadPool.submit( new Callable<List>() { @Override public List call() throws Exception { return getDataFromRemote(); } }); } |
我們?cè)谶@個(gè)方法中調(diào)用getDataFromRemote方法,并且用到了線程池。把任務(wù)加入線程池之后,理解返回Future對(duì)象。Future的get方法,還可以傳入一個(gè)超時(shí)參數(shù),用來設(shè)置等待時(shí)間,不會(huì)一直等下去。
也可以利用FutureTask來獲取結(jié)果:
1
2
3
4
5
6
7
8
9
|
FutureTask<List> futureTask = new FutureTask<List>( new Callable<List>() { @Override public List call() throws Exception { return getDataFromRemote(); } }); threadPool.submit(futureTask); futureTask.get(); |
FutureTask是一個(gè)具體的實(shí)現(xiàn)類,ThreadPoolExecutor的submit方法返回的就是一個(gè)Future的實(shí)現(xiàn),這個(gè)實(shí)現(xiàn)就是FutureTask的一個(gè)具體實(shí)例,F(xiàn)utureTask幫助實(shí)現(xiàn)了具體的任務(wù)執(zhí)行,以及和Future接口中的get方法的關(guān)聯(lián)。
FutureTask除了幫助ThreadPool很好的實(shí)現(xiàn)了對(duì)加入線程池任務(wù)的Future支持外,也為我們提供了很大的便利,使得我們自己也可以實(shí)現(xiàn)支持Future的任務(wù)調(diào)度。
補(bǔ)充知識(shí):多線程中Future與FutureTask的區(qū)別和聯(lián)系
線程的創(chuàng)建方式中有兩種,一種是實(shí)現(xiàn)Runnable接口,另一種是繼承Thread,但是這兩種方式都有個(gè)缺點(diǎn),那就是在任務(wù)執(zhí)行完成之后無法獲取返回結(jié)果,于是就有了Callable接口,F(xiàn)uture接口與FutureTask類的配和取得返回的結(jié)果。
我們先回顧一下java.lang.Runnable接口,就聲明了run(),其返回值為void,當(dāng)然就無法獲取結(jié)果。
1
2
3
|
public interface Runnable { public abstract void run(); } |
而Callable的接口定義如下
1
2
3
|
public interface Callable<V> { V call() throws Exception; } |
該接口聲明了一個(gè)名稱為call()的方法,同時(shí)這個(gè)方法可以有返回值V,也可以拋出異常。嗯,對(duì)該接口我們先了解這么多就行,下面我們來說明如何使用,前篇文章我們說過,無論是Runnable接口的實(shí)現(xiàn)類還是Callable接口的實(shí)現(xiàn)類,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor執(zhí)行,ThreadPoolExecutor或ScheduledThreadPoolExecutor都實(shí)現(xiàn)了ExcutorService接口,而因此Callable需要和Executor框架中的ExcutorService結(jié)合使用,我們先看看ExecutorService提供的方法:
1
2
3
|
<T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); |
第一個(gè)方法:submit提交一個(gè)實(shí)現(xiàn)Callable接口的任務(wù),并且返回封裝了異步計(jì)算結(jié)果的Future。
第二個(gè)方法:submit提交一個(gè)實(shí)現(xiàn)Runnable接口的任務(wù),并且指定了在調(diào)用Future的get方法時(shí)返回的result對(duì)象。(不常用)
第三個(gè)方法:submit提交一個(gè)實(shí)現(xiàn)Runnable接口的任務(wù),并且返回封裝了異步計(jì)算結(jié)果的Future。
因此我們只要?jiǎng)?chuàng)建好我們的線程對(duì)象(實(shí)現(xiàn)Callable接口或者Runnable接口),然后通過上面3個(gè)方法提交給線程池去執(zhí)行即可。還有點(diǎn)要注意的是,除了我們自己實(shí)現(xiàn)Callable對(duì)象外,我們還可以使用工廠類Executors來把一個(gè)Runnable對(duì)象包裝成Callable對(duì)象。Executors工廠類提供的方法如下:
public static Callable<Object> callable(Runnable task)
public static <T> Callable<T> callable(Runnable task, T result)
2.Future<V>接口
Future<V>接口是用來獲取異步計(jì)算結(jié)果的,說白了就是對(duì)具體的Runnable或者Callable對(duì)象任務(wù)執(zhí)行的結(jié)果進(jìn)行獲取(get()),取消(cancel()),判斷是否完成等操作。我們看看Future接口的源碼:
1
2
3
4
5
6
7
|
public interface Future<V> { boolean cancel( boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get( long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } |
方法解析:
V get() :獲取異步執(zhí)行的結(jié)果,如果沒有結(jié)果可用,此方法會(huì)阻塞直到異步計(jì)算完成。
V get(Long timeout , TimeUnit unit) :獲取異步執(zhí)行結(jié)果,如果沒有結(jié)果可用,此方法會(huì)阻塞,但是會(huì)有時(shí)間限制,如果阻塞時(shí)間超過設(shè)定的timeout時(shí)間,該方法將拋出異常。
boolean isDone() :如果任務(wù)執(zhí)行結(jié)束,無論是正常結(jié)束或是中途取消還是發(fā)生異常,都返回true。
boolean isCanceller() :如果任務(wù)完成前被取消,則返回true。
boolean cancel(boolean mayInterruptRunning) :如果任務(wù)還沒開始,執(zhí)行cancel(...)方法將返回false;如果任務(wù)已經(jīng)啟動(dòng),執(zhí)行cancel(true)方法將以中斷執(zhí)行此任務(wù)線程的方式來試圖停止任務(wù),如果停止成功,返回true;當(dāng)任務(wù)已經(jīng)啟動(dòng),執(zhí)行cancel(false)方法將不會(huì)對(duì)正在執(zhí)行的任務(wù)線程產(chǎn)生影響(讓線程正常執(zhí)行到完成),此時(shí)返回false;當(dāng)任務(wù)已經(jīng)完成,執(zhí)行cancel(...)方法將返回false。mayInterruptRunning參數(shù)表示是否中斷執(zhí)行中的線程。
通過方法分析我們也知道實(shí)際上Future提供了3種功能:(1)能夠中斷執(zhí)行中的任務(wù)(2)判斷任務(wù)是否執(zhí)行完成(3)獲取任務(wù)執(zhí)行完成后額結(jié)果。
但是我們必須明白Future只是一個(gè)接口,我們無法直接創(chuàng)建對(duì)象,因此就需要其實(shí)現(xiàn)類FutureTask登場(chǎng)啦。
3.FutureTask類
我們先來看看FutureTask的實(shí)現(xiàn)
1
2
3
4
5
|
public class FutureTask<V> implements RunnableFuture<V> { FutureTask類實(shí)現(xiàn)了RunnableFuture接口,我們看一下RunnableFuture接口的實(shí)現(xiàn): public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); } |
分析:FutureTask除了實(shí)現(xiàn)了Future接口外還實(shí)現(xiàn)了Runnable接口(即可以通過Runnable接口實(shí)現(xiàn)線程,也可以通過Future取得線程執(zhí)行完后的結(jié)果),因此FutureTask也可以直接提交給Executor執(zhí)行。
最后我們給出FutureTask的兩種構(gòu)造函數(shù):
1
2
3
4
|
public FutureTask(Callable<V> callable) { } public FutureTask(Runnable runnable, V result) { } |
4.Callable<V>/Future<V>/FutureTask的使用 (封裝了異步獲取結(jié)果的Future!!!)
通過上面的介紹,我們對(duì)Callable,F(xiàn)uture,F(xiàn)utureTask都有了比較清晰的了解了,那么它們到底有什么用呢?我們前面說過通過這樣的方式去創(chuàng)建線程的話,最大的好處就是能夠返回結(jié)果,加入有這樣的場(chǎng)景,我們現(xiàn)在需要計(jì)算一個(gè)數(shù)據(jù),而這個(gè)數(shù)據(jù)的計(jì)算比較耗時(shí),而我們后面的程序也要用到這個(gè)數(shù)據(jù)結(jié)果,那么這個(gè)時(shí) Callable豈不是最好的選擇?我們可以開設(shè)一個(gè)線程去執(zhí)行計(jì)算,而主線程繼續(xù)做其他事,而后面需要使用到這個(gè)數(shù)據(jù)時(shí),我們?cè)偈褂肍uture獲取不就可以了嗎?下面我們就來編寫一個(gè)這樣的實(shí)例
4.1 使用Callable+Future獲取執(zhí)行結(jié)果
Callable實(shí)現(xiàn)類如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.zejian.Executor; import java.util.concurrent.Callable; /** * @author zejian * @time 2016年3月15日 下午2:02:42 * @decrition Callable接口實(shí)例 */ public class CallableDemo implements Callable<Integer> { private int sum; @Override public Integer call() throws Exception { System.out.println( "Callable子線程開始計(jì)算啦!" ); Thread.sleep( 2000 ); for ( int i= 0 ;i< 5000 ;i++){ sum=sum+i; } System.out.println( "Callable子線程計(jì)算結(jié)束!" ); return sum; } } |
Callable執(zhí)行測(cè)試類如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package com.zejian.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author zejian * @time 2016年3月15日 下午2:05:43 * @decrition callable執(zhí)行測(cè)試類 */ public class CallableTest { public static void main(String[] args) { //創(chuàng)建線程池 ExecutorService es = Executors.newSingleThreadExecutor(); //創(chuàng)建Callable對(duì)象任務(wù) CallableDemo calTask= new CallableDemo(); //提交任務(wù)并獲取執(zhí)行結(jié)果 Future<Integer> future =es.submit(calTask); //關(guān)閉線程池 es.shutdown(); try { Thread.sleep( 2000 ); System.out.println( "主線程在執(zhí)行其他任務(wù)" ); if (future.get()!= null ){ //輸出獲取到的結(jié)果 System.out.println( "future.get()-->" +future.get()); } else { //輸出獲取到的結(jié)果 System.out.println( "future.get()未獲取到結(jié)果" ); } } catch (Exception e) { e.printStackTrace(); } System.out.println( "主線程在執(zhí)行完成" ); } } |
執(zhí)行結(jié)果:
Callable子線程開始計(jì)算啦!
主線程在執(zhí)行其他任務(wù)
Callable子線程計(jì)算結(jié)束!
future.get()-->12497500
主線程在執(zhí)行完成
4.2 使用Callable+FutureTask獲取執(zhí)行結(jié)果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package com.zejian.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; /** * @author zejian * @time 2016年3月15日 下午2:05:43 * @decrition callable執(zhí)行測(cè)試類 */ public class CallableTest { public static void main(String[] args) { // //創(chuàng)建線程池 // ExecutorService es = Executors.newSingleThreadExecutor(); // //創(chuàng)建Callable對(duì)象任務(wù) // CallableDemo calTask=new CallableDemo(); // //提交任務(wù)并獲取執(zhí)行結(jié)果 // Future<Integer> future =es.submit(calTask); // //關(guān)閉線程池 // es.shutdown(); //創(chuàng)建線程池 ExecutorService es = Executors.newSingleThreadExecutor(); //創(chuàng)建Callable對(duì)象任務(wù) CallableDemo calTask= new CallableDemo(); //創(chuàng)建FutureTask FutureTask<Integer> futureTask= new FutureTask<>(calTask); //執(zhí)行任務(wù) es.submit(futureTask); //關(guān)閉線程池 es.shutdown(); try { Thread.sleep( 2000 ); System.out.println( "主線程在執(zhí)行其他任務(wù)" ); if (futureTask.get()!= null ){ //輸出獲取到的結(jié)果 System.out.println( "futureTask.get()-->" +futureTask.get()); } else { //輸出獲取到的結(jié)果 System.out.println( "futureTask.get()未獲取到結(jié)果" ); } } catch (Exception e) { e.printStackTrace(); } System.out.println( "主線程在執(zhí)行完成" ); } } |
執(zhí)行結(jié)果:
Callable子線程開始計(jì)算啦!
主線程在執(zhí)行其他任務(wù)
Callable子線程計(jì)算結(jié)束!
futureTask.get()-->12497500
主線程在執(zhí)行完成
以上這篇java多線程之Future和FutureTask使用實(shí)例就是小編分享給大家的全部?jī)?nèi)容了,希望能給大家一個(gè)參考,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://blog.csdn.net/weixin_43469563/article/details/108830688