前言
很多業(yè)務(wù)場景需要使用異步去完成,比如:發(fā)送短信通知。要完成異步操作一般有兩種:
- 1、消息隊(duì)列MQ
- 2、線程池處理。
我們來看看Spring框架中如何去使用線程池來完成異步操作,以及分析背后的原理。
一. Spring異步線程池的接口類 :TaskExecutor
在Spring4中,Spring中引入了一個(gè)新的注解@Async,這個(gè)注解讓我們在使用Spring完成異步操作變得非常方便。
Spring異步線程池的接口類,其實(shí)質(zhì)是java.util.concurrent.Executor
Spring 已經(jīng)實(shí)現(xiàn)的異常線程池:
1. SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會創(chuàng)建一個(gè)新的線程。
2. SyncTaskExecutor:這個(gè)類沒有實(shí)現(xiàn)異步調(diào)用,只是一個(gè)同步操作。只適用于不需要多線程的地方
3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時(shí),才用考慮使用這個(gè)類
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時(shí)被quartz和非quartz使用,才需要使用此類
5. ThreadPoolTaskExecutor :最常使用,推薦。 其實(shí)質(zhì)是對java.util.concurrent.ThreadPoolExecutor的包裝,
我們查看ThreadPoolExecutor初始化的源碼就知道使用ThreadPoolExecutor。
二、簡單使用說明
Spring中用@Async注解標(biāo)記的方法,稱為異步方法。在spring boot應(yīng)用中使用@Async很簡單:
1、調(diào)用異步方法類上或者啟動類加上注解@EnableAsync
2、在需要被異步調(diào)用的方法外加上@Async
3、所使用的@Async注解方法的類對象應(yīng)該是Spring容器管理的bean對象;
啟動類加上注解@EnableAsync:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableAsync; @SpringBootApplication @EnableAsync public class CollectorApplication { public static void main(String[] args) throws Exception { SpringApplication.run(CollectorApplication.class, args); } }
在需要被異步調(diào)用的方法外加上@Async,同時(shí)類AsyncService加上注解@Service或者@Component,使其對象成為Spring容器管理的bean對象;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @Service @Transactional public class AsyncService { @Async public void asyncMethod(String s) { System.out.println("receive:" + s); } public void test() { System.out.println("test"); asyncMethod();//同一個(gè)類里面調(diào)用異步方法 } @Async public void test2() { AsyncService asyncService = context.getBean(AsyncService.class); asyncService.asyncMethod();//異步 } /** * 異布調(diào)用返回Future */ @Async public Future<String> asyncInvokeReturnFuture(int i) { System.out.println("asyncInvokeReturnFuture, parementer="+ i); Future<String> future; try { Thread.sleep(1000 * 1); future = new AsyncResult<String>("success:" + i); } catch (InterruptedException e) { future = new AsyncResult<String>("error"); } return future; } } //異步方法和普通的方法調(diào)用相同 asyncService.asyncMethod("123"); Future<String> future = asyncService.asyncInvokeReturnFuture(100); System.out.println(future.get());
如果將一個(gè)類聲明為異步類@Async,那么這個(gè)類對外暴露的方法全部成為異步方法。
@Async @Service public class AsyncClass { public AsyncClass() { System.out.println("----init AsyncClass----"); } volatile int index = 0; public void foo() { System.out.println("asyncclass foo, index:" + index); } public void foo(int i) { this.index = i; System.out.println("asyncclass foo, index:" + i); } public void bar(int i) { this.index = i; System.out.println("asyncclass bar, index:" + i); } }
這里需要注意的是:
1、同一個(gè)類里面調(diào)用異步方法不生效:原因默認(rèn)類內(nèi)的方法調(diào)用不會被aop攔截,即調(diào)用方和被調(diào)用方是在同一個(gè)類中,是無法產(chǎn)生切面的,該對象沒有被Spring容器管理。即@Async方法不生效。
解決辦法:如果要使同一個(gè)類中的方法之間調(diào)用也被攔截,需要使用spring容器中的實(shí)例對象,而不是使用默認(rèn)的this,因?yàn)橥ㄟ^bean實(shí)例的調(diào)用才會被spring的aop攔截
本例使用方法:AsyncService asyncService = context.getBean(AsyncService.class); 然后使用這個(gè)引用調(diào)用本地的方法即可達(dá)到被攔截的目的
備注:這種方法只能攔截protected,default,public方法,private方法無法攔截。這個(gè)是spring aop的一個(gè)機(jī)制。
2、如果不自定義異步方法的線程池默認(rèn)使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會創(chuàng)建一個(gè)新的線程。并發(fā)大的時(shí)候會產(chǎn)生嚴(yán)重的性能問題。
3、異步方法返回類型只能有兩種:void和java.util.concurrent.Future。
1)當(dāng)返回類型為void的時(shí)候,方法調(diào)用過程產(chǎn)生的異常不會拋到調(diào)用者層面,
可以通過注AsyncUncaughtExceptionHandler來捕獲此類異常
2)當(dāng)返回類型為Future的時(shí)候,方法調(diào)用過程產(chǎn)生的異常會拋到調(diào)用者層面
三、定義通用線程池
1、定義線程池
在Spring Boot主類中定義一個(gè)線程池,public Executor taskExecutor() 方法用于自定義自己的線程池,線程池前綴”taskExecutor-”。如果不定義,則使用系統(tǒng)默認(rèn)的線程池。
@SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @EnableAsync @Configuration class TaskPoolConfig { @Bean public Executor taskExecutor1() { ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor(); pool.setCorePoolSize(5); //線程池活躍的線程數(shù) pool.setMaxPoolSize(10); //線程池最大活躍的線程數(shù) pool.setWaitForTasksToCompleteOnShutdown(true); pool.setThreadNamePrefix("defaultExecutor"); return pool; } @Bean("taskExecutor") public Executor taskExecutor2() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(200); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("taskExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.setWaitForTasksToCompleteOnShutdown(true); executor.setAwaitTerminationSeconds(60); return executor; } } }
上面我們通過ThreadPoolTaskExecutor創(chuàng)建了一個(gè)線程池,同時(shí)設(shè)置了如下參數(shù):
- 核心線程數(shù)10:線程池創(chuàng)建時(shí)初始化的線程數(shù)
- 最大線程數(shù)20:線程池最大的線程數(shù),只有在緩沖隊(duì)列滿了之后才會申請超過核心線程數(shù)的線程
- 緩沖隊(duì)列200:用來緩沖執(zhí)行任務(wù)的隊(duì)列
- 允許線程的空閑時(shí)間60秒:超過了核心線程數(shù)之外的線程,在空閑時(shí)間到達(dá)之后會被銷毀
- 線程池名的前綴:設(shè)置好了之后可以方便我們定位處理任務(wù)所在的線程池
- 線程池對拒絕任務(wù)的處理策略:此處采用了CallerRunsPolicy策略,當(dāng)線程池沒有處理能力的時(shí)候,該策略會直接在execute方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù);如果執(zhí)行程序已被關(guān)閉,則會丟棄該任務(wù)
- 設(shè)置線程池關(guān)閉的時(shí)候等待所有任務(wù)都完成再繼續(xù)銷毀其他的Bean
- 設(shè)置線程池中任務(wù)的等待時(shí)間,如果超過這個(gè)時(shí)候還沒有銷毀就強(qiáng)制銷毀,以確保應(yīng)用最后能夠被關(guān)閉,而不是阻塞住
也可以單獨(dú)類來配置線程池:
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * Created by huangguisu on 2020/6/10. */ @Configuration @EnableAsync public class MyThreadPoolConfig { private static final int CORE_POOL_SIZE = 10; private static final int MAX_POOL_SIZE = 20; private static final int QUEUE_CAPACITY = 200; public static final String BEAN_EXECUTOR = "bean_executor"; /** * 事件和情感接口線程池執(zhí)行器配置 * @return 事件和情感接口線程池執(zhí)行器bean * */ @Bean(BEAN_EXECUTOR) public Executor executor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(CORE_POOL_SIZE); executor.setMaxPoolSize(MAX_POOL_SIZE); // 設(shè)置隊(duì)列容量 executor.setQueueCapacity(QUEUE_CAPACITY); // 設(shè)置線程活躍時(shí)間(秒) executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("SE-Pool#Task"); // 設(shè)置拒絕策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
同時(shí)注意需要在配置類上添加@EnableAsync,當(dāng)然也可以在啟動類上添加,表示開啟spring的@@Async
2、異步方法使用線程池
只需要在@Async注解中指定線程池名即可
@Component public class Task { //默認(rèn)使用線程池 @Async public void doTaskOne() throws Exception { System.out.println("開始做任務(wù)"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任務(wù)耗時(shí):" + (end - start) + "毫秒"); } //根據(jù)Bean Name指定特定線程池 @Async("taskExecutor") public void doTaskOne() throws Exception { System.out.println("開始做任務(wù)"); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); System.out.println("完成任務(wù)耗時(shí):" + (end - start) + "毫秒"); } }
3、通過xml配置定義線程池
Bean文件配置: spring_async.xml
1. 線程的前綴為xmlExecutor
2. 啟動異步線程池配置
<!-- 等價(jià)于 @EnableAsync, executor指定線程池 --> <task:annotation-driven executor="xmlExecutor"/> <!-- id指定線程池產(chǎn)生線程名稱的前綴 --> <task:executor id="xmlExecutor" pool-size="5-25" queue-capacity="100" keep-alive="120" rejection-policy="CALLER_RUNS"/>
啟動類導(dǎo)入xml文件:
@SpringBootApplication @ImportResource("classpath:/async/spring_async.xml") public class AsyncApplicationWithXML { private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithXML.class); public static void main(String[] args) { log.info("Start AsyncApplication.. "); SpringApplication.run(AsyncApplicationWithXML.class, args); } }
線程池參數(shù)說明
1. ‘id" : 線程的名稱的前綴
2. ‘pool-size":線程池的大小。支持范圍”min-max”和固定值(此時(shí)線程池core和max sizes相同)
3. ‘queue-capacity" :排隊(duì)隊(duì)列長度
4. ‘rejection-policy": 對拒絕的任務(wù)處理策略
5. ‘keep-alive" : 線程保活時(shí)間(單位秒)
四、異常處理
上面也提到:在調(diào)用方法時(shí),可能出現(xiàn)方法中拋出異常的情況。在異步中主要有有兩種異常處理方法:
1. 對于方法返回值是Futrue的異步方法:
a) 、一種是在調(diào)用future的get時(shí)捕獲異常;
b)、 在異常方法中直接捕獲異常
2. 對于返回值是void的異步方法:通過AsyncUncaughtExceptionHandler處理異常
@Component public class AsyncException { /** * 帶參數(shù)的異步調(diào)用 異步方法可以傳入?yún)?shù) * 對于返回值是void,異常會被AsyncUncaughtExceptionHandler處理掉 * @param s */ @Async public void asyncInvokeWithException(String s) { log.info("asyncInvokeWithParameter, parementer={}", s); throw new IllegalArgumentException(s); } /** * 異常調(diào)用返回Future * 對于返回值是Future,不會被AsyncUncaughtExceptionHandler處理,需要我們在方法中捕獲異常并處理 * 或者在調(diào)用方在調(diào)用Futrue.get時(shí)捕獲異常進(jìn)行處理 * * @param i * @return */ @Async public Future<String> asyncInvokeReturnFuture(int i) { System.out.println("asyncInvokeReturnFuture, parementer={}", i); Future<String> future; try { Thread.sleep(1000 * 1); future = new AsyncResult<String>("success:" + i); throw new IllegalArgumentException("a"); } catch (InterruptedException e) { future = new AsyncResult<String>("error"); } catch(IllegalArgumentException e){ future = new AsyncResult<String>("error-IllegalArgumentException"); } return future; } }
實(shí)現(xiàn)AsyncConfigurer接口對異常線程池更加細(xì)粒度的控制
a) 創(chuàng)建線程自己的線程池
b) 對void方法拋出的異常處理的類AsyncUncaughtExceptionHandler
@Service public class MyAsyncConfigurer implements AsyncConfigurer{ @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); threadPool.setCorePoolSize(1); threadPool.setMaxPoolSize(1); threadPool.setWaitForTasksToCompleteOnShutdown(true); threadPool.setAwaitTerminationSeconds(60 * 15); threadPool.setThreadNamePrefix("MyAsync-"); threadPool.initialize(); return threadPool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new MyAsyncExceptionHandler(); } /** * 自定義異常處理類 */ class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... obj) { System.out.println("Exception message - " + throwable.getMessage()); System.out.println("Method name - " + method.getName()); for (Object param : obj) { System.out.println("Parameter value - " + param); } } } }
五、問題
上面也提到:如果不自定義異步方法的線程池默認(rèn)使用SimpleAsyncTaskExecutor。SimpleAsyncTaskExecutor:不是真的線程池,這個(gè)類不重用線程,每次調(diào)用都會創(chuàng)建一個(gè)新的線程。并發(fā)大的時(shí)候會產(chǎn)生嚴(yán)重的性能問題。
一般的錯(cuò)誤OOM:OutOfMemoryError:unable to create new native thread,創(chuàng)建線程數(shù)量太多,占用內(nèi)存過大.
解決辦法:一般最好使用自定義線程池,做一些特殊策略, 比如自定義拒絕策略,如果隊(duì)列滿了,則拒絕處理該任務(wù)。
到此這篇關(guān)于Spring Boot之@Async異步線程池的文章就介紹到這了,更多相關(guān)Spring Boot @Async異步線程池內(nèi)容請搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!
原文鏈接:https://blog.csdn.net/hguisu/article/details/106671893