系統介紹
從圖中可以看到,我們這個系統是一個基于事件的異步任務系統。就是說當一個事件產生時,生產者將事件拋給調度器,調度器負責查詢事件下有哪些任務,然后將這些任務丟到相應的隊列中,最后由消費者消費任務隊列中的任務。
在整個系統中主要分為三大部分
1.事件生產者,即產生消息事件的一方。
2.任務調度器(Scheduler),負責注冊事件并調度任務。
3.消費者(Worker),負責消費任務隊列中的任務。
事件生產者
事件生產者很簡單,在業務系統中直接調用即可,代碼如下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
<?php require_once DIR. '/../autoload.php' ; use Asynclib\Ebats\Event; try { $event = new Event( 'order_paied' ); //定義事件 $event ->setOptions([ 'order_id' => 'FB138020392193312' ]); //事件產生的參數 $event ->publish(); } catch (Exception $exc ){ echo $exc ->getMessage(); } |
任務調度器
調度器主要做兩件事,一是注冊事件,另一個是調度任務。
注冊事件代碼如下:
1
2
3
4
5
|
//注冊事件 EventManager::register( 'order_create' , 'closeOrder' , 'demo' , 10); //關閉未付款訂單(延遲任務) EventManager::register( 'order_paied' , 'virtualShipping' , 'demo' ); //虛擬商品自動發貨 |
這樣就注冊了兩個事件,事件下各有一個任務。
具體調度部分代碼很簡單,就不多贅述,有興趣的可以去看代碼。
消費者
重頭戲來了,一個異步任務系統最重要的就是消費端了,現在讓我們來看下Worker的流程圖。
可以看到,在這里我們采用了兩個交換器和兩個隊列,一個負責處理正常的任務即ntask,另一個負責處理需要延遲執行的任務即dtask。簡單描述下一個任務的生命周期。
正常任務
1、task產生,進入正常任務的交換器Exchange[ebats_core_ntask]
2、交換器根據topic將任務分發到對應的隊列中
3、子進程ntask阻塞等待成功獲取到task,并執行該任務
4、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
5、子進程ntask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]
6、將任務執行信息回調給上層開發者以便保存查看
延遲任務
1、子進程dtask阻塞等待成功獲取到task,并執行該任務
2、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
3、子進程dtask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]
4、將任務執行信息回調給上層開發者以便保存查看
消費者代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
require_once DIR. '/../autoload.php' ; require_once DIR. '/task/TaskDemoModel.php' ; use Asynclib\Ebats\Worker; //執行結果回調函數 $callback = function ( $topic , $taskid , $taskname , $params , $timeuse , $message ){ }; $worker = new Worker( $callback ); //支持多進程消費默認為1 $worker ->setQueue( 'demo' ); //隊列名和事件的topic一一對應 $worker ->run(); |
自定義調度器
一般來說這是一個基于事件的任務系統,那么能不能直接產生任務呢。答案是肯定的。
只需要創建一個自定義調度器,由您自行實現調度邏輯,最終生成一個任務即可。代碼如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
<?php require_once DIR. '/../autoload.php' ; use Asynclib\Ebats\Task; use Asynclib\Core\Consumer; use Asynclib\Amq\ExchangeTypes; use Asynclib\Exception\ExceptionInterface; /** * 本示例演示了如何創建一個自定義調度器,開發者可以根據自身需求開發自己的任務調度器 */ try { $worker = new Consumer(); $worker ->setExchange( 'order_fanout' , ExchangeTypes::TOPIC); $worker ->setQueue( 'shzf_order_paied' , [ '*.*.WAIT_SELLER_SEND_GOODS' ]); $worker ->run( function ( $key , $msg ){ $order_data = json_encode( $msg ); echo " [$key] $order_data \n" ; Task::create( 'demo' , 'orderAsync' , $msg ); //創建任務,之后消息將作為參數由任務接管處理 }); } catch (ExceptionInterface $exc ){ echo $exc ->getMessage(); } |
這樣,當接收到消息時就會產生一個orderAsync的任務,您只需要啟動一個用來消費這個Topic的Worker即可。
也許你會覺得這里直接寫業務邏輯的代碼就可以了,實際上也確實可以。當你可以忍受一個進程慢慢消費的時候是可以這樣做的。但大多數情況下我們還是希望它能夠盡快的消費掉,所以建議這里只負責創建任務,具體任務的業務邏輯由worker去執行。
以上就是如何用RabbitMQ和Swoole實現一個異步任務系統的詳細內容,更多關于用RabbitMQ和Swoole實現一個異步任務系統的資料請關注服務器之家其它相關文章!
原文鏈接:https://www.cnblogs.com/a609251438/p/12510476.html