一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - RabbitMQ消息中間件示例詳解

RabbitMQ消息中間件示例詳解

2021-06-22 13:27-Finley- Java教程

這篇文章主要給大家介紹了關于RabbitMQ消息中間件的相關資料,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一學習學習吧

前言

rabbitmq 是使用 erlang 語言開發的消息中間件, 其遵循了高級消息隊列協議(advanced message queuing protocol, amqp)。

與 kafka 等消息隊列相比,rabbitmq 最大的優勢在于其較高的可靠性:

  • 提供確認(ack)和重傳機制保證消息完成消費, 消費者異常不會導致消息丟失
  • 提供消息持久化機制, broker 崩潰不會導致消息丟失
  • 集群模式下工作, 保證高可用

因為具有較高可靠性和一致性, rabbitmq 可以勝任訂單處理、秒殺等一致性要求較高的業務場景。

rabbitmq 概念與機制

rabbitmq 中的概念模型:

  • broker: 消息中間件實例, 可能是單個節點也可能是運行在多節點集群上的邏輯實體
  • 消息(message): 消息由消息頭和消息體兩部分組成。消息頭中包括routing-key、priority等標準消息頭以及其它自定義消息頭,用于定義rabbitmq對消息行為。消息體是字節流,包含消息內容。
  • 連接(connection): 客戶端與 broker 之間的 tcp連接
  • 信道(channel): channel 是建立在 tcp 連接上的邏輯(虛擬)連接。多個 channel 復用同一個 tcp 連接, 以避免建立 tcp 連接的巨大開銷。 rabbitmq 官方要求每個線程使用獨立的 channel, 禁止多個線程共用 channel。
  • 生產者(publisher): 發送消息的客戶端線程
  • 消費者(consumer): 處理消息的客戶端線程
  • 交換機(exchange): 交換機負責將消息投遞到相應的隊列
  • 隊列(queue): 接收并保存交換機投遞的消息,直至被消費者成功消費。邏輯結構遵循先進先出fifo。
  • 綁定(binding): 將隊列(queue)注冊到交換機(exchange)的路由表
  • 虛擬主機(vhost): 每個broker下可建立多個vhost, 每個 vhost 可建立獨立的 exchange、queue、綁定及權限系統。同一個 broker 下的 vhost 共享 connection、channel 和 用戶系統,就是說可以使用同一個用戶身份使用同一個 channel 訪問不同 vhost。

交換機(exchange)

生產者發送的消息會首先送到交換機(exchange), 交換機根據自身類型和消息的 routing-key 等信息將消息投遞到綁定的消息隊列中。

rabbitmq中的四種標準交換機:

direct: 如果消息的 routing-key 與隊列的 binding-key 完全相同,direct類型的交換機則會將消息投遞到該隊列中。

  • 多個隊列可以使用相同的 binding-key 綁定到同一個 direct 交換機,direct 交換機會把消息投遞到所有 binding-key 與消息 routing-key 相同的隊列

topic: 允許隊列的 binding-key 中包含通配符*和#, topic 交換機會將消息投遞到 binding-key 與 routing-key 匹配的隊列中。

  • 通配符按照關鍵字進行匹配,如news.cn.a中的關鍵字是news、cn和a,即關鍵字按照.分割
  • #通配符匹配0個或多個關鍵字, news.#.a可以匹配news.a, news.cn.a和news.asia.cn.a等
  • *通配符匹配一個關鍵字, news.*.a匹配news.cn.a不匹配news.a、news.asia.cn.a

fanout: fanout 交換機不進行任何匹配, 將消息投遞到所有綁定的隊列

header: header 交換機根據消息頭進行投遞,現在已較少使用

我們可以使用 rabbitmq 的插件機制使用第三方交換機或自行開發交換機。如實現延時投遞的。

消息頭中的delivery-mode可以設置為 persistent(持久化) 或者 transient(易失)。 exchange 和 queue 在處理持久化的消息時都會先將消息寫入磁盤中再進行下一步處理, 即使 rabbitmq 崩潰也不會丟失。

消費者客戶端通常使用的channel.basicconsume使用推(push)模式投遞消息, 即當有新消息時 broker 通過 channel 主動向客戶端發送消息。客戶端也可以使用channel.basicget從 broker 拉取消息。

ack機制

rabbitmq 提供了確認送達(acknowledge)機制保證消息被正確處理不會丟失。

確認送達的回執有三種:

  • ack: 消息已被成功處理
  • nack: 消息處理異常, 需要重新投遞
  • reject: 消息非法, 丟棄消息

rabbitmq 的 queue 可以設置 no_ack=true, 則消息被投遞后即刪除不等待回執。

channel.basicconsume 可以指定auto_ack模式,若auto_ack=true當客戶端收到完整消息后即會自動發出ack回執,否則必須顯式的發出回執。

java 代碼示例

首先安裝并啟動rabbitmq實例, mac用戶可以使用 homebrew 進行安裝:

?
1
brew install rabbitmq

啟動服務:

?
1
brew services start rabbitmq

或者使用官方docker鏡像:

?
1
docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3-management

rabbitmq官網提供了ubuntu、rpm以及windows等多種平臺安裝方式。

rabbitmq默認tcp端口為5672, web控制臺默認端口15672。

在maven中添加依賴:

?
1
2
3
4
5
<dependency>
 <groupid>com.rabbitmq</groupid>
 <artifactid>amqp-client</artifactid>
 <version>5.5.1</version>
</dependency>

編寫生產者:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package rabbit;
 
import java.io.ioexception;
import java.util.concurrent.timeoutexception;
 
import com.rabbitmq.client.amqp;
import com.rabbitmq.client.channel;
import com.rabbitmq.client.connection;
import com.rabbitmq.client.connectionfactory;
 
/**
 * @author finley
 */
public class rabbitproducer {
 
 public static void main(string[] args) throws ioexception, timeoutexception {
  connectionfactory factory = new connectionfactory();
  factory.setusername("guest");
  factory.setpassword("guest");
  factory.sethost("localhost");
  try (connection conn = factory.newconnection();
    channel channel = conn.createchannel()) {
   string exchangename = "test-exchange";
   channel.exchangedeclare(exchangename, "direct", true);
 
   string routingkey = "hello";
 
   byte[] msg = "hello world".getbytes();
   amqp.basicproperties.builder propsbuilder = new amqp.basicproperties.builder();
   propsbuilder.deliverymode(2); // persistent
   propsbuilder.priority(0); // normal
   propsbuilder.contenttype("text/plain");
   channel.basicpublish(exchangename, routingkey, propsbuilder.build(), msg);
  }
 }
}

編寫消費者:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package rabbit;
 
import java.io.ioexception;
import java.util.concurrent.timeoutexception;
 
import com.rabbitmq.client.*;
 
/**
 * @author finley
 */
public class rabbitconsumer {
 
 public static void main(string[] args) throws ioexception, timeoutexception {
  connectionfactory factory = new connectionfactory();
  factory.setusername("guest");
  factory.setpassword("guest");
  factory.sethost("localhost");
  try (connection conn = factory.newconnection();
    channel channel = conn.createchannel()) {
   string exchangename = "test-exchange";
   channel.exchangedeclare(exchangename, "direct", true);
 
   string queuename = channel.queuedeclare().getqueue();
   string bindingkey = "hello";
   channel.queuebind(queuename, exchangename, bindingkey);
 
   while(true) {
    channel.basicconsume(queuename, false, "", new defaultconsumer(channel) {
     @override
     public void handledelivery(string consumertag, envelope envelope, amqp.basicproperties properties, byte[] body) throws ioexception {
      string routingkey = envelope.getroutingkey();
      string contenttype = properties.getcontenttype();
      string bodystr = new string(body, "utf-8");
      system.out.println("routingkey: " + routingkey + ", contenttype: " + contenttype + ", body: " + bodystr);
      long deliverytag = envelope.getdeliverytag();
      channel.basicack(deliverytag, false);
     }
    });
   }
  }
 }
 
}

rabbitmq 的消息為字節, 可以將 java 對象序列化后作為消息體發送。

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。

原文鏈接:https://www.cnblogs.com/Finley/p/10126315.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 欧美激情综合 | 俄罗斯三级在线观看级 | 精品蜜臀AV在线天堂 | 亚洲乱码一二三四区国产 | 18hdxxxx日本护士| 黑人巨 | 亚洲高清在线天堂精品 | 日韩成a人片在线观看日本 日韩不卡一区二区 | 91精品91| 免费观看在线永久免费xx视频 | 美女脱了内裤让男桶爽 | 精品欧美小视频在线观看 | 亚洲精品国产自在现线最新 | 青青青青青| 99视频精品全部免费观看 | 免费观看韩剧网站在线观看 | 国产一级一级片 | 精品视频在线免费观看 | 国产日韩欧美色视频色在线观看 | 古装一级无遮挡毛片免费观看 | 九9热这里真品 | 日本一区二区三区久久 | 国产目拍亚洲精品一区二区三区 | 欧美性色黄大片四虎影视 | 亚洲精品一区二区三区在线看 | 国自产在线精品免费 | 狠狠鲁视频 | japanese日本护士 | 热99精品只有里视频最新 | 国产精品色拉拉免费看 | 校花被老头夺去第一次动图 | www在线视频在线播放 | 91精品乱码一区二区三区 | 亚洲男人的天堂成人 | 亚洲男人天堂影院 | 能播放18xxx18女同 | 日韩欧美高清 | 三年片韩国在线观看 | 日本黄a | 91综合在线视频 | 成人做视频免费 |