一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - SpringBoot+RabbitMQ 實現 RPC 調用

SpringBoot+RabbitMQ 實現 RPC 調用

2021-11-15 21:57江南一點雨 Java教程

說到 RPC(Remote Procedure Call Protocol 遠程過程調用協議),小伙伴們腦海里蹦出的估計都是 RESTful API、Dubbo、WebService、Java RMI、CORBA 等。

SpringBoot+RabbitMQ 實現 RPC 調用

說到 RPC(Remote Procedure Call Protocol 遠程過程調用協議),小伙伴們腦海里蹦出的估計都是 RESTful API、Dubbo、WebService、Java RMI、CORBA 等。

其實,RabbitMQ 也給我們提供了 RPC 功能,并且使用起來很簡單。

今天松哥通過一個簡單的案例來和大家分享一下 Spring Boot+RabbitMQ 如何實現一個簡單的 RPC 調用。

注意

關于 RabbitMQ 實現 RPC 調用,有的小伙伴可能會有一些誤解,心想這還不簡單?搞兩個消息隊列 queue_1 和 queue_2,首先客戶端發送消息到 queue_1 上,服務端監聽 queue_1 上的消息,收到之后進行處理;處理完成后,服務端發送消息到 queue_2 隊列上,然后客戶端監聽 queue_2 隊列上的消息,這樣就知道服務端的處理結果了。

這種方式不是不可以,就是有點麻煩!RabbitMQ 中提供了現成的方案可以直接使用,非常方便。接下來我們就一起來學習下。

1. 架構

先來看一個簡單的架構圖:

SpringBoot+RabbitMQ 實現 RPC 調用

這張圖把問題說的很明白了:

首先 Client 發送一條消息,和普通的消息相比,這條消息多了兩個關鍵內容:一個是 correlation_id,這個表示這條消息的唯一 id,還有一個內容是 reply_to,這個表示消息回復隊列的名字。

Server 從消息發送隊列獲取消息并處理相應的業務邏輯,處理完成后,將處理結果發送到 reply_to 指定的回調隊列中。

Client 從回調隊列中讀取消息,就可以知道消息的執行情況是什么樣子了。

這種情況其實非常適合處理異步調用。

2. 實踐

接下來我們通過一個具體的例子來看看這個怎么玩。

2.1 客戶端開發

首先我們來創建一個 Spring Boot 工程名為 producer,作為消息生產者,創建時候添加 web 和 rabbitmq 依賴,如下圖:

SpringBoot+RabbitMQ 實現 RPC 調用

項目創建成功之后,首先在 application.properties 中配置 RabbitMQ 的基本信息,如下:

  1. spring.rabbitmq.host=localhost
  2. spring.rabbitmq.port=5672
  3. spring.rabbitmq.username=guest
  4. spring.rabbitmq.password=guest
  5. spring.rabbitmq.publisher-confirm-type=correlated
  6. spring.rabbitmq.publisher-returns=true

這個配置前面四行都好理解,我就不贅述,后面兩行:首先是配置消息確認方式,我們通過 correlated 來確認,只有開啟了這個配置,將來的消息中才會帶 correlation_id,只有通過 correlation_id 我們才能將發送的消息和返回值之間關聯起來。最后一行配置則是開啟發送失敗退回。

接下來我們來提供一個配置類,如下:

  1. /**
  2. * @author 江南一點雨
  3. * @微信公眾號 江南一點雨
  4. * @網站 http://www.itboyhub.com
  5. * @國際站 http://www.javaboy.org
  6. * @微信 a_java_boy
  7. * @GitHub https://github.com/lenve
  8. * @Gitee https://gitee.com/lenve
  9. */
  10. @Configuration
  11. public class RabbitConfig {
  12. public static final String RPC_QUEUE1 = "queue_1";
  13. public static final String RPC_QUEUE2 = "queue_2";
  14. public static final String RPC_EXCHANGE = "rpc_exchange";
  15. /**
  16. * 設置消息發送RPC隊列
  17. */
  18. @Bean
  19. Queue msgQueue() {
  20. return new Queue(RPC_QUEUE1);
  21. }
  22. /**
  23. * 設置返回隊列
  24. */
  25. @Bean
  26. Queue replyQueue() {
  27. return new Queue(RPC_QUEUE2);
  28. }
  29. /**
  30. * 設置交換機
  31. */
  32. @Bean
  33. TopicExchange exchange() {
  34. return new TopicExchange(RPC_EXCHANGE);
  35. }
  36. /**
  37. * 請求隊列和交換器綁定
  38. */
  39. @Bean
  40. Binding msgBinding() {
  41. return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
  42. }
  43. /**
  44. * 返回隊列和交換器綁定
  45. */
  46. @Bean
  47. Binding replyBinding() {
  48. return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
  49. }
  50. /**
  51. * 使用 RabbitTemplate發送和接收消息
  52. * 并設置回調隊列地址
  53. */
  54. @Bean
  55. RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
  56. RabbitTemplate template = new RabbitTemplate(connectionFactory);
  57. template.setReplyAddress(RPC_QUEUE2);
  58. template.setReplyTimeout(6000);
  59. return template;
  60. }
  61. /**
  62. * 給返回隊列設置監聽器
  63. */
  64. @Bean
  65. SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
  66. SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
  67. container.setConnectionFactory(connectionFactory);
  68. container.setQueueNames(RPC_QUEUE2);
  69. container.setMessageListener(rabbitTemplate(connectionFactory));
  70. return container;
  71. }
  72. }

這個配置類中我們分別配置了消息發送隊列 msgQueue 和消息返回隊列 replyQueue,然后將這兩個隊列和消息交換機進行綁定。這個都是 RabbitMQ 的常規操作,沒啥好說的。

在 Spring Boot 中我們負責消息發送的工具是 RabbitTemplate,默認情況下,系統自動提供了該工具,但是這里我們需要對該工具重新進行定制,主要是添加消息發送的返回隊列,最后我們還需要給返回隊列設置一個監聽器。

好啦,接下來我們就可以開始具體的消息發送了:

  1. /**
  2. * @author 江南一點雨
  3. * @微信公眾號 江南一點雨
  4. * @網站 http://www.itboyhub.com
  5. * @國際站 http://www.javaboy.org
  6. * @微信 a_java_boy
  7. * @GitHub https://github.com/lenve
  8. * @Gitee https://gitee.com/lenve
  9. */
  10. @RestController
  11. public class RpcClientController {
  12. private static final Logger logger = LoggerFactory.getLogger(RpcClientController.class);
  13. @Autowired
  14. private RabbitTemplate rabbitTemplate;
  15. @GetMapping("/send")
  16. public String send(String message) {
  17. // 創建消息對象
  18. Message newMessage = MessageBuilder.withBody(message.getBytes()).build();
  19. logger.info("client send:{}", newMessage);
  20. //客戶端發送消息
  21. Message result = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE1, newMessage);
  22. String response = "";
  23. if (result != null) {
  24. // 獲取已發送的消息的 correlationId
  25. String correlationId = newMessage.getMessageProperties().getCorrelationId();
  26. logger.info("correlationId:{}", correlationId);
  27. // 獲取響應頭信息
  28. HashMap headers = (HashMap) result.getMessageProperties().getHeaders();
  29. // 獲取 server 返回的消息 id
  30. String msgId = (String) headers.get("spring_returned_message_correlation");
  31. if (msgId.equals(correlationId)) {
  32. response = new String(result.getBody());
  33. logger.info("client receive:{}", response);
  34. }
  35. }
  36. return response;
  37. }
  38. }

這塊的代碼其實也都是一些常規代碼,我挑幾個關鍵的節點說下:

  • 消息發送調用 sendAndReceive 方法,該方法自帶返回值,返回值就是服務端返回的消息。
  • 服務端返回的消息中,頭信息中包含了 spring_returned_message_correlation 字段,這個就是消息發送時候的 correlation_id,通過消息發送時候的 correlation_id 以及返回消息頭中的 spring_returned_message_correlation 字段值,我們就可以將返回的消息內容和發送的消息綁定到一起,確認出這個返回的內容就是針對這個發送的消息的。

這就是整個客戶端的開發,其實最最核心的就是 sendAndReceive 方法的調用。調用雖然簡單,但是準備工作還是要做足夠。例如如果我們沒有在 application.properties 中配置 correlated,發送的消息中就沒有 correlation_id,這樣就無法將返回的消息內容和發送的消息內容關聯起來。

2.2 服務端開發

再來看看服務端的開發。

首先創建一個名為 consumer 的 Spring Boot 項目,創建項目添加的依賴和客戶端開發創建的依賴是一致的,不再贅述。

然后配置 application.properties 配置文件,該文件的配置也和客戶端中的配置一致,不再贅述。

接下來提供一個 RabbitMQ 的配置類,這個配置類就比較簡單,單純的配置一下消息隊列并將之和消息交換機綁定起來,如下:

  1. /**
  2. * @author 江南一點雨
  3. * @微信公眾號 江南一點雨
  4. * @網站 http://www.itboyhub.com
  5. * @國際站 http://www.javaboy.org
  6. * @微信 a_java_boy
  7. * @GitHub https://github.com/lenve
  8. * @Gitee https://gitee.com/lenve
  9. */
  10. @Configuration
  11. public class RabbitConfig {
  12. public static final String RPC_QUEUE1 = "queue_1";
  13. public static final String RPC_QUEUE2 = "queue_2";
  14. public static final String RPC_EXCHANGE = "rpc_exchange";
  15. /**
  16. * 配置消息發送隊列
  17. */
  18. @Bean
  19. Queue msgQueue() {
  20. return new Queue(RPC_QUEUE1);
  21. }
  22. /**
  23. * 設置返回隊列
  24. */
  25. @Bean
  26. Queue replyQueue() {
  27. return new Queue(RPC_QUEUE2);
  28. }
  29. /**
  30. * 設置交換機
  31. */
  32. @Bean
  33. TopicExchange exchange() {
  34. return new TopicExchange(RPC_EXCHANGE);
  35. }
  36. /**
  37. * 請求隊列和交換器綁定
  38. */
  39. @Bean
  40. Binding msgBinding() {
  41. return BindingBuilder.bind(msgQueue()).to(exchange()).with(RPC_QUEUE1);
  42. }
  43. /**
  44. * 返回隊列和交換器綁定
  45. */
  46. @Bean
  47. Binding replyBinding() {
  48. return BindingBuilder.bind(replyQueue()).to(exchange()).with(RPC_QUEUE2);
  49. }
  50. }

最后我們再來看下消息的消費:

  1. @Component
  2. public class RpcServerController {
  3. private static final Logger logger = LoggerFactory.getLogger(RpcServerController.class);
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @RabbitListener(queues = RabbitConfig.RPC_QUEUE1)
  7. public void process(Message msg) {
  8. logger.info("server receive : {}",msg.toString());
  9. Message response = MessageBuilder.withBody(("i'm receive:"+new String(msg.getBody())).getBytes()).build();
  10. CorrelationData correlationData = new CorrelationData(msg.getMessageProperties().getCorrelationId());
  11. rabbitTemplate.sendAndReceive(RabbitConfig.RPC_EXCHANGE, RabbitConfig.RPC_QUEUE2, response, correlationData);
  12. }
  13. }

這里的邏輯就比較簡單了:

  • 服務端首先收到消息并打印出來。
  • 服務端提取出原消息中的 correlation_id。
  • 服務端調用 sendAndReceive 方法,將消息發送給 RPC_QUEUE2 隊列,同時帶上 correlation_id 參數。

服務端的消息發出后,客戶端將收到服務端返回的結果。

OK,大功告成。

2.3 測試

接下來我們進行一個簡單測試。

首先啟動 RabbitMQ。

接下來分別啟動 producer 和 consumer,然后在 postman 中調用 producer 的接口進行測試,如下:

SpringBoot+RabbitMQ 實現 RPC 調用

可以看到,已經收到了服務端的返回信息。

來看看 producer 的運行日志:

SpringBoot+RabbitMQ 實現 RPC 調用

可以看到,消息發送出去后,同時也收到了 consumer 返回的信息。

SpringBoot+RabbitMQ 實現 RPC 調用

可以看到,consumer 也收到了客戶端發來的消息。

3. 小結

好啦,一個小小的案例,帶小伙伴們體驗一把 RabbitMQ 實現 RPC 調用。

原文鏈接:https://mp.weixin.qq.com/s/wJrnWtX-648tzQHT39cwhg

延伸 · 閱讀

精彩推薦
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
主站蜘蛛池模板: 6969精品视频在线观看 | 四虎影院免费在线 | www久久| 久9视频这里只有精品123 | 奇米影视奇米色777欧美 | 精品国产一区二区三区久久久蜜臀 | 91入口免费网站大全 | 王小军怎么了最新消息 | 国产精品露脸国语对白手机视频 | 日本无吗免费一二区 | 久久中文字幕无线观看 | 波多野结在线观看 | 国产在线精品一区二区高清不卡 | 满溢游泳池免费土豪全集下拉版 | 色就色综合 | 亚洲精品91 | 欧美成人禁片在线观看俄罗斯 | 成人国产第一区在线观看 | xxxxx性13一14 | 学校捏奶揉下面污文h | 久久 这里只精品 免费 | 媳妇和公公小说 | 精品亚洲综合久久中文字幕 | 91制片厂制作果冻传媒八夷 | 国产成人a∨麻豆精品 | 久久99re热在线观看视频 | 国产区香蕉精品系列在线观看不卡 | 天堂俺去俺来也www久久婷婷 | 秋霞在线观看成人高清视频51 | 欧美日韩亚洲高清不卡一区二区三区 | 欧洲vodafonewi精品 | 久久er99热精品一区二区 | 成年人在线观看视频 | 欧美三级免费观看 | 嫩草视频在线观看免费 | 欧美xxoo做爰猛烈视频 | 国产特黄一级一片免费 | 国产麻豆剧果冻传媒观看免费视频 | 美女隐私部位视频网站 | 手机免费在线视频 | 久久99国产亚洲高清观着 |