前言
為什么使用RabbitMq而不是ActiveMq或者RocketMq?
首先,從業務上來講,我并不要求消息的100%接受率,并且,我需要結合php開發,RabbitMq相較RocketMq,延遲較低(微妙級)。至于ActiveMq,貌似問題較多。RabbitMq對各種語言的支持較好,所以選擇RabbitMq。
先安裝PHP對應的RabbitMQ,這里用的是 php_amqp 不同的擴展實現方式會有細微的差異.
php擴展地址: http://pecl.php.net/package/amqp
具體以官網為準 http://www.rabbitmq.com/getstarted.html
介紹
- config.php 配置信息
- BaseMQ.php MQ基類
- ProductMQ.php 生產者類
- ConsumerMQ.php 消費者類
- Consumer2MQ.php 消費者2(可有多個)
config.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
<?php return [ //配置 'host' => [ 'host' => '127.0.0.1' , 'port' => '5672' , 'login' => 'guest' , 'password' => 'guest' , 'vhost' => '/' , ], //交換機 'exchange' => 'word' , //路由 'routes' => [], ]; |
BaseMQ.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
<?php /** * Created by PhpStorm. * User: pc * Date: 2018/12/13 * Time: 14:11 */ namespace MyObjSummary\rabbitMQ; /** Member * AMQPChannel * AMQPConnection * AMQPEnvelope * AMQPExchange * AMQPQueue * Class BaseMQ * @package MyObjSummary\rabbitMQ */ class BaseMQ { /** MQ Channel * @var \AMQPChannel */ public $AMQPChannel ; /** MQ Link * @var \AMQPConnection */ public $AMQPConnection ; /** MQ Envelope * @var \AMQPEnvelope */ public $AMQPEnvelope ; /** MQ Exchange * @var \AMQPExchange */ public $AMQPExchange ; /** MQ Queue * @var \AMQPQueue */ public $AMQPQueue ; /** conf * @var */ public $conf ; /** exchange * @var */ public $exchange ; /** link * BaseMQ constructor. * @throws \AMQPConnectionException */ public function __construct() { $conf = require 'config.php' ; if (! $conf ) throw new \AMQPConnectionException( 'config error!' ); $this ->conf = $conf [ 'host' ] ; $this ->exchange = $conf [ 'exchange' ] ; $this ->AMQPConnection = new \AMQPConnection( $this ->conf); if (! $this ->AMQPConnection->connect()) throw new \AMQPConnectionException( "Cannot connect to the broker!\n" ); } /** * close link */ public function close() { $this ->AMQPConnection->disconnect(); } /** Channel * @return \AMQPChannel * @throws \AMQPConnectionException */ public function channel() { if (! $this ->AMQPChannel) { $this ->AMQPChannel = new \AMQPChannel( $this ->AMQPConnection); } return $this ->AMQPChannel; } /** Exchange * @return \AMQPExchange * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function exchange() { if (! $this ->AMQPExchange) { $this ->AMQPExchange = new \AMQPExchange( $this ->channel()); $this ->AMQPExchange->setName( $this ->exchange); } return $this ->AMQPExchange ; } /** queue * @return \AMQPQueue * @throws \AMQPConnectionException * @throws \AMQPQueueException */ public function queue() { if (! $this ->AMQPQueue) { $this ->AMQPQueue = new \AMQPQueue( $this ->channel()); } return $this ->AMQPQueue ; } /** Envelope * @return \AMQPEnvelope */ public function envelope() { if (! $this ->AMQPEnvelope) { $this ->AMQPEnvelope = new \AMQPEnvelope(); } return $this ->AMQPEnvelope; } } |
ProductMQ.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
<?php //生產者 P namespace MyObjSummary\rabbitMQ; require 'BaseMQ.php' ; class ProductMQ extends BaseMQ { private $routes = [ 'hello' , 'word' ]; //路由key /** * ProductMQ constructor. * @throws \AMQPConnectionException */ public function __construct() { parent::__construct(); } /** 只控制發送成功 不接受消費者是否收到 * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function run() { //頻道 $channel = $this ->channel(); //創建交換機對象 $ex = $this ->exchange(); //消息內容 $message = 'product message ' .rand(1,99999); //開始事務 $channel ->startTransaction(); $sendEd = true ; foreach ( $this ->routes as $route ) { $sendEd = $ex ->publish( $message , $route ) ; echo "Send Message:" . $sendEd . "\n" ; } if (! $sendEd ) { $channel ->rollbackTransaction(); } $channel ->commitTransaction(); //提交事務 $this ->close(); die ; } } try { ( new ProductMQ())->run(); } catch (\Exception $exception ){ var_dump( $exception ->getMessage()) ; } |
ConsumerMQ.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
<?php //消費者 C namespace MyObjSummary\rabbitMQ; require 'BaseMQ.php' ; class ConsumerMQ extends BaseMQ { private $q_name = 'hello' ; //隊列名 private $route = 'hello' ; //路由key /** * ConsumerMQ constructor. * @throws \AMQPConnectionException */ public function __construct() { parent::__construct(); } /** 接受消息 如果終止 重連時會有消息 * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException * @throws \AMQPQueueException */ public function run() { //創建交換機 $ex = $this ->exchange(); $ex ->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex ->setFlags(AMQP_DURABLE); //持久化 //echo "Exchange Status:".$ex->declare()."\n"; //創建隊列 $q = $this ->queue(); //var_dump($q->declare());exit(); $q ->setName( $this ->q_name); $q ->setFlags(AMQP_DURABLE); //持久化 //echo "Message Total:".$q->declareQueue()."\n"; //綁定交換機與隊列,并指定路由鍵 echo 'Queue Bind: ' . $q ->bind( $this ->exchange, $this ->route). "\n" ; //阻塞模式接收消息 echo "Message:\n" ; while (True){ $q ->consume( function ( $envelope , $queue ){ $msg = $envelope ->getBody(); echo $msg . "\n" ; //處理消息 $queue ->ack( $envelope ->getDeliveryTag()); //手動發送ACK應答 }); //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 } $this ->close(); } } try { ( new ConsumerMQ)->run(); } catch (\Exception $exception ){ var_dump( $exception ->getMessage()) ; } |
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對服務器之家的支持。
原文鏈接:https://segmentfault.com/a/1190000018515670