一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - PHP教程 - PHP+RabbitMQ實現消息隊列的完整代碼

PHP+RabbitMQ實現消息隊列的完整代碼

2021-07-26 12:19SokminYo PHP教程

這篇文章主要給大家介紹了關于利用PHP+RabbitMQ實現消息隊列的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用PHP具有一定的參考學習價值,需要的朋友們下面來一起學習學習吧

前言

為什么使用RabbitMq而不是ActiveMq或者RocketMq?

首先,從業務上來講,我并不要求消息的100%接受率,并且,我需要結合php開發,RabbitMq相較RocketMq,延遲較低(微妙級)。至于ActiveMq,貌似問題較多。RabbitMq對各種語言的支持較好,所以選擇RabbitMq。

先安裝PHP對應的RabbitMQ,這里用的是 php_amqp 不同的擴展實現方式會有細微的差異.

php擴展地址: http://pecl.php.net/package/amqp

具體以官網為準  http://www.rabbitmq.com/getstarted.html

介紹

  • config.php 配置信息
  • BaseMQ.php MQ基類
  • ProductMQ.php 生產者類
  • ConsumerMQ.php 消費者類
  • Consumer2MQ.php 消費者2(可有多個)

config.php

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<?php
return [
 //配置
 'host' => [
  'host' => '127.0.0.1',
  'port' => '5672',
  'login' => 'guest',
  'password' => 'guest',
  'vhost'=>'/',
 ],
 //交換機
 'exchange'=>'word',
 //路由
 'routes' => [],
];

BaseMQ.php

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
<?php
/**
 * Created by PhpStorm.
 * User: pc
 * Date: 2018/12/13
 * Time: 14:11
 */
 
namespace MyObjSummary\rabbitMQ;
 
/** Member
 *  AMQPChannel
 *  AMQPConnection
 *  AMQPEnvelope
 *  AMQPExchange
 *  AMQPQueue
 * Class BaseMQ
 * @package MyObjSummary\rabbitMQ
 */
class BaseMQ
{
 /** MQ Channel
  * @var \AMQPChannel
  */
 public $AMQPChannel ;
 
 /** MQ Link
  * @var \AMQPConnection
  */
 public $AMQPConnection ;
 
 /** MQ Envelope
  * @var \AMQPEnvelope
  */
 public $AMQPEnvelope ;
 
 /** MQ Exchange
  * @var \AMQPExchange
  */
 public $AMQPExchange ;
 
 /** MQ Queue
  * @var \AMQPQueue
  */
 public $AMQPQueue ;
 
 /** conf
  * @var
  */
 public $conf ;
 
 /** exchange
  * @var
  */
 public $exchange ;
 
 /** link
  * BaseMQ constructor.
  * @throws \AMQPConnectionException
  */
 public function __construct()
 {
  $conf = require 'config.php' ;
  if(!$conf)
   throw new \AMQPConnectionException('config error!');
  $this->conf  = $conf['host'] ;
  $this->exchange = $conf['exchange'] ;
  $this->AMQPConnection = new \AMQPConnection($this->conf);
  if (!$this->AMQPConnection->connect())
   throw new \AMQPConnectionException("Cannot connect to the broker!\n");
 }
 
 /**
  * close link
  */
 public function close()
 {
  $this->AMQPConnection->disconnect();
 }
 
 /** Channel
  * @return \AMQPChannel
  * @throws \AMQPConnectionException
  */
 public function channel()
 {
  if(!$this->AMQPChannel) {
   $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
  }
  return $this->AMQPChannel;
 }
 
 /** Exchange
  * @return \AMQPExchange
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
  */
 public function exchange()
 {
  if(!$this->AMQPExchange) {
   $this->AMQPExchange = new \AMQPExchange($this->channel());
   $this->AMQPExchange->setName($this->exchange);
  }
  return $this->AMQPExchange ;
 }
 
 /** queue
  * @return \AMQPQueue
  * @throws \AMQPConnectionException
  * @throws \AMQPQueueException
  */
 public function queue()
 {
  if(!$this->AMQPQueue) {
   $this->AMQPQueue = new \AMQPQueue($this->channel());
  }
  return $this->AMQPQueue ;
 }
 
 /** Envelope
  * @return \AMQPEnvelope
  */
 public function envelope()
 {
  if(!$this->AMQPEnvelope) {
   $this->AMQPEnvelope = new \AMQPEnvelope();
  }
  return $this->AMQPEnvelope;
 }
}

ProductMQ.php

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
<?php
//生產者 P
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ProductMQ extends BaseMQ
{
 private $routes = ['hello','word']; //路由key
 
 /**
  * ProductMQ constructor.
  * @throws \AMQPConnectionException
  */
 public function __construct()
 {
  parent::__construct();
 }
 
 /** 只控制發送成功 不接受消費者是否收到
  * @throws \AMQPChannelException
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
  */
 public function run()
 {
  //頻道
  $channel = $this->channel();
  //創建交換機對象
  $ex = $this->exchange();
  //消息內容
  $message = 'product message '.rand(1,99999);
  //開始事務
  $channel->startTransaction();
  $sendEd = true ;
  foreach ($this->routes as $route) {
   $sendEd = $ex->publish($message, $route) ;
   echo "Send Message:".$sendEd."\n";
  }
  if(!$sendEd) {
   $channel->rollbackTransaction();
  }
  $channel->commitTransaction(); //提交事務
  $this->close();
  die ;
 }
}
try{
 (new ProductMQ())->run();
}catch (\Exception $exception){
 var_dump($exception->getMessage()) ;
}

ConsumerMQ.php

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<?php
//消費者 C
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ConsumerMQ extends BaseMQ
{
 private $q_name = 'hello'; //隊列名
 private $route = 'hello'; //路由key
 
 /**
  * ConsumerMQ constructor.
  * @throws \AMQPConnectionException
  */
 public function __construct()
 {
  parent::__construct();
 }
 
 /** 接受消息 如果終止 重連時會有消息
  * @throws \AMQPChannelException
  * @throws \AMQPConnectionException
  * @throws \AMQPExchangeException
  * @throws \AMQPQueueException
  */
 public function run()
 {
 
  //創建交換機
  $ex = $this->exchange();
  $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
  $ex->setFlags(AMQP_DURABLE); //持久化
  //echo "Exchange Status:".$ex->declare()."\n";
 
  //創建隊列
  $q = $this->queue();
  //var_dump($q->declare());exit();
  $q->setName($this->q_name);
  $q->setFlags(AMQP_DURABLE); //持久化
  //echo "Message Total:".$q->declareQueue()."\n";
 
  //綁定交換機與隊列,并指定路由鍵
  echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n";
 
  //阻塞模式接收消息
  echo "Message:\n";
  while(True){
   $q->consume(function ($envelope,$queue){
    $msg = $envelope->getBody();
    echo $msg."\n"; //處理消息
    $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答
   });
   //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答
  }
  $this->close();
 }
}
try{
 (new ConsumerMQ)->run();
}catch (\Exception $exception){
 var_dump($exception->getMessage()) ;
}

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持。

原文鏈接:https://segmentfault.com/a/1190000018515670

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91热国内精品永久免费观看 | 精品一区二区三区在线成人 | 国产在线观看人成激情视频 | 天堂俺去俺来也www久久婷婷 | 日韩毛片免费 | poverty中国老妇人 | 国产在线乱子伦一区二区 | 免费一级欧美大片在线观看 | 吃大胸寡妇的奶 | 国产91在线免费 | 国产成人高清精品免费观看 | 欧美在线成人免费国产 | 色橹橹 | 三星w699| 亚洲AV久久久噜噜噜久久 | 国产精品久久久久久久福利院 | 日本a在线天堂 | 娇妻被老外疯狂调教 | 国产一区在线免费观看 | 成人久久伊人精品伊人 | 亚洲精品www久久久久久 | 高人先生免费观看全集 | 国产在线视频欧美亚综合 | 亚洲AV永久无码精品老司机蜜桃 | 久久91精品国产91久久户 | 天海翼黄色三级 | 草留色区| 国产午夜精品久久久久 | 亚洲骚图 | a级黄色网 | 日日操视频 | 日本精品一区二区三区 | 亚洲H成年动漫在线观看不卡 | 秋霞网毛片 | 男女做污事 | 亚洲精品视频免费在线观看 | 国产高清不卡视频在线播放 | 香蕉精品国产高清自在自线 | jizz农村野外jizz农民 | 精品国产日韩一区三区 | 天堂va在线高清一区 |