前言
本系列demo均以springboot快速構建,基本包使用到lombok(一個便捷的對象構造工具 get/set)、spring-boot-starter-web,使用springboot僅為了快速構建sample項目,對于學習spring的對應功能無影響。
我們希望你已經有一定的java基礎與了解一個自己喜歡的idea功能,謝謝。
github
地址:https://github.com/unclecatmyself/spring-tutorial
學習
完成設置發布和訂閱消息的rabbitmq amqp服務器的過程。
構建
構建一個使用spring amqp發布消息的應用程序,rabbittemplate并使用pojo訂閱消息messagelisteneradapter。
創建rabbit mq消息接收器
使用任何基于消息傳遞的應用程序,您需要創建一個響應已發布消息的接收器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@slf4j @component public class receiver { private countdownlatch latch = new countdownlatch( 1 ); public void receivemessage(string message){ log.info( "received < " + message + " >" ); latch.countdown(); } public countdownlatch getlatch(){ return latch; } } |
receiver是一個簡單的pojo,它定義了一種接收消息的方法。當您注冊它以接收消息時,您可以將其命名為任何您想要的名稱。
為方便起見,這個pojo也有一個countdownlatch。這允許它發信號通知接收到消息。這是您不太可能在生產應用程序中實現的。
注冊監聽器并發送消息
spring amqp rabbittemplate 提供了使用rabbitmq發送和接收消息所需的一切。具體來說,你需要配置:
- 消息偵聽器容器
- 聲明隊列,交換以及它們之間的綁定
- 用于發送一些消息以測試偵聽器的組件
spring boot會自動創建連接工廠和rabbittemplate,從而減少您必須編寫的代碼量。
您將使用rabbittemplate發送消息,并將receiver使用消息偵聽器容器注冊,以接收消息。連接工廠驅動兩者,允許它們連接到rabbitmq服務器。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
@springbootapplication public class rabbitmqapplication { static final string topicexchangename = "spring-boot-exchange" ; static final string queuename = "spring-boot" ; @bean queue queue(){ return new queue(queuename, false ); } @bean topicexchange exchange(){ return new topicexchange(topicexchangename); } @bean binding binding(queue queue,topicexchange exchange){ return bindingbuilder.bind(queue).to(exchange).with( "foo.bar.#" ); } @bean simplemessagelistenercontainer container(connectionfactory connectionfactory, messagelisteneradapter listeneradapter){ simplemessagelistenercontainer container = new simplemessagelistenercontainer(); container.setconnectionfactory(connectionfactory); container.setqueuenames(queuename); container.setmessagelistener(listeneradapter); return container; } @bean messagelisteneradapter listeneradapter(receiver receiver){ return new messagelisteneradapter(receiver, "receivemessage" ); } public static void main(string[] args) { springapplication.run(rabbitmqapplication. class , args).close(); } } |
@springbootapplication 是一個便利注釋,添加了以下所有內容:
- @configuration 標記該類作為應用程序上下文的bean定義的源。
- @enableautoconfiguration 告訴spring boot開始根據類路徑設置,其他bean和各種屬性設置添加bean。
- 通常你會添加@enablewebmvc一個spring mvc應用程序,但spring boot會在類路徑上看到spring-webmvc時自動添加它。這會將應用程序標記為web應用程序并激活關鍵行為,例如設置a dispatcherservlet。
- @componentscan告訴spring在包中尋找其他組件,配置和服務hello,允許它找到控制器。
該main()方法使用spring boot的springapplication.run()方法來啟動應用程序。您是否注意到沒有一行xml?也沒有web.xml文件。此web應用程序是100%純java,您無需處理配置任何管道或基礎結構。
listeneradapter()方法中定義的bean在定義的容器中注冊為消息偵聽器container()。它將偵聽“spring-boot”隊列中的消息。因為receiver該類是pojo,所以需要將其包裝在messagelisteneradapter指定要調用的位置receivemessage。
jms隊列和amqp隊列具有不同的語義。例如,jms僅向一個使用者發送排隊的消息。雖然amqp隊列執行相同的操作,但amqp生成器不會直接向隊列發送消息。相反,消息被發送到交換機,交換機可以轉到單個隊列,或扇出到多個隊列,模仿jms主題的概念。
消息監聽器容器和接收器bean是您監聽消息所需的全部內容。要發送消息,您還需要一個rabbit模板。
該queue()方法創建amqp隊列。該exchange()方法創建主題交換。該binding()方法將這兩者綁定在一起,定義rabbittemplate發布到交換時發生的行為。
spring amqp要求將the queue,the topicexchange,和binding聲明為頂級spring bean才能正確設置。
在這種情況下,我們使用主題交換,并且隊列與路由密鑰綁定,foo.bar.#這意味著使用以路由鍵開頭的任何消息foo.bar.將被路由到隊列。
發送測試消息
測試消息由commandlinerunner,他還等待接收器中的鎖存器并關閉應用程序上下文:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@slf4j @component public class runner implements commandlinerunner { private final rabbittemplate rabbittemplate; private final receiver receiver; public runner(receiver receiver, rabbittemplate rabbittemplate){ this .receiver = receiver; this .rabbittemplate = rabbittemplate; } @override public void run(string... strings) throws exception { log.info( "sending message...." ); rabbittemplate.convertandsend(rabbitmqapplication.topicexchangename, "foo.bar.baz" , "hello from rabbitmq!" ); receiver.getlatch().await( 10000 , timeunit.milliseconds); } } |
請注意,模板將消息路由到交換機,其路由密鑰foo.bar.baz與綁定匹配。
可以在測試中模擬出運行器,以便可以單獨測試接收器。
運行程序,你應該看到如下輸出:
1
2
3
4
5
6
7
8
|
2018 - 12 - 03 10 : 23 : 46.779 info 10828 --- [ main] o.s.b.w.embedded.tomcat.tomcatwebserver : tomcat started on port(s): 8080 (http) with context path '' 2018 - 12 - 03 10 : 23 : 46.782 info 10828 --- [ main] c.g.unclecatmyself.rabbitmqapplication : started rabbitmqapplication in 3.61 seconds (jvm running for 4.288 ) 2018 - 12 - 03 10 : 23 : 46.784 info 10828 --- [ main] com.github.unclecatmyself.runner : sending message.... 2018 - 12 - 03 10 : 23 : 46.793 info 10828 --- [ container- 1 ] com.github.unclecatmyself.receiver : received < hello from rabbitmq! > 2018 - 12 - 03 10 : 23 : 46.799 info 10828 --- [ main] o.s.a.r.l.simplemessagelistenercontainer : waiting for workers to finish. 2018 - 12 - 03 10 : 23 : 47.813 info 10828 --- [ main] o.s.a.r.l.simplemessagelistenercontainer : successfully waited for workers to finish. 2018 - 12 - 03 10 : 23 : 47.815 info 10828 --- [ main] o.s.s.concurrent.threadpooltaskexecutor : shutting down executorservice 'applicationtaskexecutor' 2018 - 12 - 03 10 : 23 : 47.816 info 10828 --- [ main] o.s.a.r.l.simplemessagelistenercontainer : shutdown ignored - container is not active already |
結尾
恭喜!您剛剛使用spring和rabbitmq開發了一個簡單的發布 - 訂閱應用程序。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。
原文鏈接:https://www.imooc.com/article/266497