前言
rabbitmq 是使用 erlang 語言開發的消息中間件, 其遵循了高級消息隊列協議(advanced message queuing protocol, amqp)。
與 kafka 等消息隊列相比,rabbitmq 最大的優勢在于其較高的可靠性:
- 提供確認(ack)和重傳機制保證消息完成消費, 消費者異常不會導致消息丟失
- 提供消息持久化機制, broker 崩潰不會導致消息丟失
- 集群模式下工作, 保證高可用
因為具有較高可靠性和一致性, rabbitmq 可以勝任訂單處理、秒殺等一致性要求較高的業務場景。
rabbitmq 概念與機制
rabbitmq 中的概念模型:
- broker: 消息中間件實例, 可能是單個節點也可能是運行在多節點集群上的邏輯實體
- 消息(message): 消息由消息頭和消息體兩部分組成。消息頭中包括routing-key、priority等標準消息頭以及其它自定義消息頭,用于定義rabbitmq對消息行為。消息體是字節流,包含消息內容。
- 連接(connection): 客戶端與 broker 之間的 tcp連接
- 信道(channel): channel 是建立在 tcp 連接上的邏輯(虛擬)連接。多個 channel 復用同一個 tcp 連接, 以避免建立 tcp 連接的巨大開銷。 rabbitmq 官方要求每個線程使用獨立的 channel, 禁止多個線程共用 channel。
- 生產者(publisher): 發送消息的客戶端線程
- 消費者(consumer): 處理消息的客戶端線程
- 交換機(exchange): 交換機負責將消息投遞到相應的隊列
- 隊列(queue): 接收并保存交換機投遞的消息,直至被消費者成功消費。邏輯結構遵循先進先出fifo。
- 綁定(binding): 將隊列(queue)注冊到交換機(exchange)的路由表
- 虛擬主機(vhost): 每個broker下可建立多個vhost, 每個 vhost 可建立獨立的 exchange、queue、綁定及權限系統。同一個 broker 下的 vhost 共享 connection、channel 和 用戶系統,就是說可以使用同一個用戶身份使用同一個 channel 訪問不同 vhost。
交換機(exchange)
生產者發送的消息會首先送到交換機(exchange), 交換機根據自身類型和消息的 routing-key 等信息將消息投遞到綁定的消息隊列中。
rabbitmq中的四種標準交換機:
direct: 如果消息的 routing-key 與隊列的 binding-key 完全相同,direct類型的交換機則會將消息投遞到該隊列中。
- 多個隊列可以使用相同的 binding-key 綁定到同一個 direct 交換機,direct 交換機會把消息投遞到所有 binding-key 與消息 routing-key 相同的隊列
topic: 允許隊列的 binding-key 中包含通配符*和#, topic 交換機會將消息投遞到 binding-key 與 routing-key 匹配的隊列中。
- 通配符按照關鍵字進行匹配,如news.cn.a中的關鍵字是news、cn和a,即關鍵字按照.分割
- #通配符匹配0個或多個關鍵字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等
- *通配符匹配一個關鍵字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a
fanout: fanout 交換機不進行任何匹配, 將消息投遞到所有綁定的隊列
header: header 交換機根據消息頭進行投遞,現在已較少使用
我們可以使用 rabbitmq 的插件機制使用第三方交換機或自行開發交換機。如實現延時投遞的。
消息頭中的delivery-mode可以設置為 persistent(持久化) 或者 transient(易失)。 exchange 和 queue 在處理持久化的消息時都會先將消息寫入磁盤中再進行下一步處理, 即使 rabbitmq 崩潰也不會丟失。
消費者客戶端通常使用的channel.basicconsume使用推(push)模式投遞消息, 即當有新消息時 broker 通過 channel 主動向客戶端發送消息。客戶端也可以使用channel.basicget從 broker 拉取消息。
ack機制
rabbitmq 提供了確認送達(acknowledge)機制保證消息被正確處理不會丟失。
確認送達的回執有三種:
- ack: 消息已被成功處理
- nack: 消息處理異常, 需要重新投遞
- reject: 消息非法, 丟棄消息
rabbitmq 的 queue 可以設置 no_ack=true, 則消息被投遞后即刪除不等待回執。
channel.basicconsume 可以指定auto_ack模式,若auto_ack=true當客戶端收到完整消息后即會自動發出ack回執,否則必須顯式的發出回執。
java 代碼示例
首先安裝并啟動rabbitmq實例, mac用戶可以使用 homebrew 進行安裝:
1
|
brew install rabbitmq |
啟動服務:
1
|
brew services start rabbitmq |
或者使用官方docker鏡像:
1
|
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq: 3 -management |
rabbitmq官網提供了ubuntu、rpm以及windows等多種平臺安裝方式。
rabbitmq默認tcp端口為5672, web控制臺默認端口15672。
在maven中添加依賴:
1
2
3
4
5
|
<dependency> <groupid>com.rabbitmq</groupid> <artifactid>amqp-client</artifactid> <version> 5.5 . 1 </version> </dependency> |
編寫生產者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package rabbit; import java.io.ioexception; import java.util.concurrent.timeoutexception; import com.rabbitmq.client.amqp; import com.rabbitmq.client.channel; import com.rabbitmq.client.connection; import com.rabbitmq.client.connectionfactory; /** * @author finley */ public class rabbitproducer { public static void main(string[] args) throws ioexception, timeoutexception { connectionfactory factory = new connectionfactory(); factory.setusername( "guest" ); factory.setpassword( "guest" ); factory.sethost( "localhost" ); try (connection conn = factory.newconnection(); channel channel = conn.createchannel()) { string exchangename = "test-exchange" ; channel.exchangedeclare(exchangename, "direct" , true ); string routingkey = "hello" ; byte [] msg = "hello world" .getbytes(); amqp.basicproperties.builder propsbuilder = new amqp.basicproperties.builder(); propsbuilder.deliverymode( 2 ); // persistent propsbuilder.priority( 0 ); // normal propsbuilder.contenttype( "text/plain" ); channel.basicpublish(exchangename, routingkey, propsbuilder.build(), msg); } } } |
編寫消費者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package rabbit; import java.io.ioexception; import java.util.concurrent.timeoutexception; import com.rabbitmq.client.*; /** * @author finley */ public class rabbitconsumer { public static void main(string[] args) throws ioexception, timeoutexception { connectionfactory factory = new connectionfactory(); factory.setusername( "guest" ); factory.setpassword( "guest" ); factory.sethost( "localhost" ); try (connection conn = factory.newconnection(); channel channel = conn.createchannel()) { string exchangename = "test-exchange" ; channel.exchangedeclare(exchangename, "direct" , true ); string queuename = channel.queuedeclare().getqueue(); string bindingkey = "hello" ; channel.queuebind(queuename, exchangename, bindingkey); while ( true ) { channel.basicconsume(queuename, false , "" , new defaultconsumer(channel) { @override public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte [] body) throws ioexception { string routingkey = envelope.getroutingkey(); string contenttype = properties.getcontenttype(); string bodystr = new string(body, "utf-8" ); system.out.println( "routingkey: " + routingkey + ", contenttype: " + contenttype + ", body: " + bodystr); long deliverytag = envelope.getdeliverytag(); channel.basicack(deliverytag, false ); } }); } } } } |
rabbitmq 的消息為字節, 可以將 java 對象序列化后作為消息體發送。
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:https://www.cnblogs.com/Finley/p/10126315.html