簡(jiǎn)介
rabbitmq是實(shí)現(xiàn)amqp(高級(jí)消息隊(duì)列協(xié)議)的消息中間件的一種,用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗
概念:
- 生產(chǎn)者 消息的產(chǎn)生方,負(fù)責(zé)將消息推送到消息隊(duì)列
- 消費(fèi)者 消息的最終接受方,負(fù)責(zé)監(jiān)聽隊(duì)列中的對(duì)應(yīng)消息,消費(fèi)消息
- 隊(duì)列 消息的寄存器,負(fù)責(zé)存放生產(chǎn)者發(fā)送的消息
- 交換機(jī) 負(fù)責(zé)根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息
- 綁定 完成交換機(jī)和隊(duì)列之間的綁定
模式:
- direct:直連模式,用于實(shí)例間的任務(wù)分發(fā)
- topic:話題模式,通過可配置的規(guī)則分發(fā)給綁定在該exchange上的隊(duì)列
- headers:適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達(dá)規(guī)則
- fanout:分發(fā)給所有綁定到該exchange上的隊(duì)列,忽略routing key
springboot集成rabbitmq
一、引入maven依賴
1
2
3
4
5
|
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> <version> 1.5 . 2 .release</version> </dependency> |
二、配置application.properties
1
2
3
4
5
6
|
# rabbitmq spring.rabbitmq.host = dev-mq.a.pa.com spring.rabbitmq.port = 5672 spring.rabbitmq.username = admin spring.rabbitmq.password = admin spring.rabbitmq.virtualhost = /message-test/ |
三、編寫amqpconfiguration配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
package message.test.configuration; import org.springframework.amqp.core.acknowledgemode; import org.springframework.amqp.core.amqptemplate; import org.springframework.amqp.core.binding; import org.springframework.amqp.core.bindingbuilder; import org.springframework.amqp.core.directexchange; import org.springframework.amqp.core.queue; import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory; import org.springframework.amqp.rabbit.connection.cachingconnectionfactory; import org.springframework.amqp.rabbit.connection.connectionfactory; import org.springframework.amqp.rabbit.core.rabbittemplate; import org.springframework.beans.factory.annotation.autowired; import org.springframework.beans.factory.annotation.qualifier; import org.springframework.boot.autoconfigure.amqp.rabbitproperties; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; @configuration public class amqpconfiguration { /** * 消息編碼 */ public static final string message_encoding = "utf-8" ; public static final string exchange_issue = "exchange_message_issue" ; public static final string queue_issue_user = "queue_message_issue_user" ; public static final string queue_issue_all_user = "queue_message_issue_all_user" ; public static final string queue_issue_all_device = "queue_message_issue_all_device" ; public static final string queue_issue_city = "queue_message_issue_city" ; public static final string routing_key_issue_user = "routing_key_message_issue_user" ; public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user" ; public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device" ; public static final string routing_key_issue_city = "routing_key_message_issue_city" ; public static final string exchange_push = "exchange_message_push" ; public static final string queue_push_result = "queue_message_push_result" ; @autowired private rabbitproperties rabbitproperties; @bean public queue issueuserqueue() { return new queue(queue_issue_user); } @bean public queue issuealluserqueue() { return new queue(queue_issue_all_user); } @bean public queue issuealldevicequeue() { return new queue(queue_issue_all_device); } @bean public queue issuecityqueue() { return new queue(queue_issue_city); } @bean public queue pushresultqueue() { return new queue(queue_push_result); } @bean public directexchange issueexchange() { return new directexchange(exchange_issue); } @bean public directexchange pushexchange() { // 參數(shù)1:隊(duì)列 // 參數(shù)2:是否持久化 // 參數(shù)3:是否自動(dòng)刪除 return new directexchange(exchange_push, true , true ); } @bean public binding issueuserqueuebinding( @qualifier ( "issueuserqueue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user); } @bean public binding issuealluserqueuebinding( @qualifier ( "issuealluserqueue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user); } @bean public binding issuealldevicequeuebinding( @qualifier ( "issuealldevicequeue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device); } @bean public binding issuecityqueuebinding( @qualifier ( "issuecityqueue" ) queue queue, @qualifier ( "issueexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city); } @bean public binding pushresultqueuebinding( @qualifier ( "pushresultqueue" ) queue queue, @qualifier ( "pushexchange" ) directexchange exchange) { return bindingbuilder.bind(queue).to(exchange).withqueuename(); } @bean public connectionfactory defaultconnectionfactory() { cachingconnectionfactory connectionfactory = new cachingconnectionfactory(); connectionfactory.sethost(rabbitproperties.gethost()); connectionfactory.setport(rabbitproperties.getport()); connectionfactory.setusername(rabbitproperties.getusername()); connectionfactory.setpassword(rabbitproperties.getpassword()); connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost()); return connectionfactory; } @bean public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory( @qualifier ( "defaultconnectionfactory" ) connectionfactory connectionfactory) { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); factory.setconnectionfactory(connectionfactory); factory.setacknowledgemode(acknowledgemode.manual); return factory; } @bean public amqptemplate rabbittemplate( @qualifier ( "defaultconnectionfactory" ) connectionfactory connectionfactory) { return new rabbittemplate(connectionfactory); } } |
三、編寫生產(chǎn)者
1
2
3
|
body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding); rabbittemplate.convertandsend(amqpconfiguration.exchange_issue, amqpconfiguration.routing_key_issue_user, body); |
四、編寫消費(fèi)者
1
2
3
4
5
|
@rabbitlistener (queues = amqpconfiguration.queue_push_result) public void handlepushresult( @payload byte [] data, channel channel, @header (amqpheaders.delivery_tag) long deliverytag) { } |
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:https://segmentfault.com/a/1190000018555963