一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - PHP教程 - 如何用RabbitMQ和Swoole實現一個異步任務系統

如何用RabbitMQ和Swoole實現一個異步任務系統

2021-11-17 15:59八重櫻 PHP教程

從最開始的使用redis實現的單進程消費的異步任務系統到加入swoole的多進程消費模式,現在,我們的異步任務系統終于又能邁進一步。這回基于RabbitMQ的異步任務系統設計的的更加完善,包括多進程消費,異常重試等。

系統介紹

如何用RabbitMQ和Swoole實現一個異步任務系統

從圖中可以看到,我們這個系統是一個基于事件的異步任務系統。就是說當一個事件產生時,生產者將事件拋給調度器,調度器負責查詢事件下有哪些任務,然后將這些任務丟到相應的隊列中,最后由消費者消費任務隊列中的任務。

在整個系統中主要分為三大部分

1.事件生產者,即產生消息事件的一方。

2.任務調度器(Scheduler),負責注冊事件并調度任務。

3.消費者(Worker),負責消費任務隊列中的任務。

事件生產者

事件生產者很簡單,在業務系統中直接調用即可,代碼如下。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php
 
require_once DIR.'/../autoload.php';
 
use Asynclib\Ebats\Event;
 
try{
 
    $event = new Event('order_paied');  //定義事件
 
    $event->setOptions(['order_id' => 'FB138020392193312']); //事件產生的參數
 
    $event->publish();
 
}catch (Exception $exc){
 
    echo $exc->getMessage();
 
}

任務調度器

調度器主要做兩件事,一是注冊事件,另一個是調度任務。

注冊事件代碼如下:

?
1
2
3
4
5
//注冊事件
 
EventManager::register('order_create', 'closeOrder', 'demo', 10);//關閉未付款訂單(延遲任務)
 
EventManager::register('order_paied', 'virtualShipping', 'demo'); //虛擬商品自動發貨

這樣就注冊了兩個事件,事件下各有一個任務。

具體調度部分代碼很簡單,就不多贅述,有興趣的可以去看代碼。

消費者

重頭戲來了,一個異步任務系統最重要的就是消費端了,現在讓我們來看下Worker的流程圖。

如何用RabbitMQ和Swoole實現一個異步任務系統

可以看到,在這里我們采用了兩個交換器和兩個隊列,一個負責處理正常的任務即ntask,另一個負責處理需要延遲執行的任務即dtask。簡單描述下一個任務的生命周期。

正常任務

1、task產生,進入正常任務的交換器Exchange[ebats_core_ntask]

2、交換器根據topic將任務分發到對應的隊列中

3、子進程ntask阻塞等待成功獲取到task,并執行該任務

4、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException

5、子進程ntask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]

6、將任務執行信息回調給上層開發者以便保存查看

延遲任務

1、子進程dtask阻塞等待成功獲取到task,并執行該任務
2、執行失敗,需要重試時拋出RetryException,不需要重試時拋出TaskException
3、子進程dtask捕獲到重試異常將任務拋給延遲任務的交換器Exchange[ebats_core_dtask]
4、將任務執行信息回調給上層開發者以便保存查看

消費者代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
require_once DIR.'/../autoload.php';
 
require_once DIR.'/task/TaskDemoModel.php';
 
use Asynclib\Ebats\Worker;
 
  
 
//執行結果回調函數
 
$callback = function ($topic, $taskid, $taskname, $params, $timeuse, $message){
 
  
 
};
 
$worker = new Worker($callback);  //支持多進程消費默認為1
 
$worker->setQueue('demo');  //隊列名和事件的topic一一對應
 
$worker->run();

自定義調度器

一般來說這是一個基于事件的任務系統,那么能不能直接產生任務呢。答案是肯定的。

只需要創建一個自定義調度器,由您自行實現調度邏輯,最終生成一個任務即可。代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<?php
 
require_once DIR.'/../autoload.php';
 
use Asynclib\Ebats\Task;
 
use Asynclib\Core\Consumer;
 
use Asynclib\Amq\ExchangeTypes;
 
use Asynclib\Exception\ExceptionInterface;
 
  
 
/**
 
 * 本示例演示了如何創建一個自定義調度器,開發者可以根據自身需求開發自己的任務調度器
 
 */
 
try{
 
    $worker = new Consumer();
 
    $worker->setExchange('order_fanout', ExchangeTypes::TOPIC);
 
    $worker->setQueue('shzf_order_paied', ['*.*.WAIT_SELLER_SEND_GOODS']);
 
    $worker->run(function($key, $msg){
 
        $order_data = json_encode($msg);
 
        echo " [$key] $order_data \n";
 
        Task::create('demo', 'orderAsync', $msg);//創建任務,之后消息將作為參數由任務接管處理
 
    });
 
}catch (ExceptionInterface $exc){
 
    echo $exc->getMessage();
 
}

這樣,當接收到消息時就會產生一個orderAsync的任務,您只需要啟動一個用來消費這個Topic的Worker即可。

也許你會覺得這里直接寫業務邏輯的代碼就可以了,實際上也確實可以。當你可以忍受一個進程慢慢消費的時候是可以這樣做的。但大多數情況下我們還是希望它能夠盡快的消費掉,所以建議這里只負責創建任務,具體任務的業務邏輯由worker去執行。

以上就是如何用RabbitMQ和Swoole實現一個異步任務系統的詳細內容,更多關于用RabbitMQ和Swoole實現一個異步任務系統的資料請關注服務器之家其它相關文章!

原文鏈接:https://www.cnblogs.com/a609251438/p/12510476.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 九九精品免费视频 | 免费视频专区一国产盗摄 | 91久久国产成人免费观看资源 | 秋霞网毛片 | 亚洲天堂网在线观看视频 | 女同色图 | 超级碰碰免费视频 | 免费理伦片高清在线 | 出差被灌醉绝伦的上司日本 | 边摸边吃奶边做爽gif动态图 | 亚洲午夜精品久久久久 | 成人福利在线播放 | 精品午夜久久福利大片免费 | 亚洲AV精品一区二区三区不卡 | 国产一精品一av一免费爽爽 | 欧美特欧美特级一片 | 国产精品一级香蕉一区 | 久九九精品免费视频 | 国模孕妇季玥全部人体写真 | 日本孕妇与黑人xxxxxx | 精品国产麻豆免费人成网站 | 我的奶头被客人吸的又肿又红 | 国产一区二区三区久久精品 | 999精品视频在线观看 | 红怡院欧洲| 免费精品视频在线 | 美女和男人免费网站视频 | 日韩免费 | 5g影院天天影院天天爽影院网站 | 国产一区二区视频在线播放 | 热99re久久精品精品免费 | 无遮18禁在线永久免费观看挡 | 隔壁老王国产在线精品 | 色淫影院 | 深夜福利影院在线观看 | 久久亚洲精品AV成人无 | 久久婷婷五月综合色丁香花 | 99成人| a毛片免费观看完整 | 国产二区精品视频 | 日本小视频网站 |