一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - spring boot集成rabbitmq的實例教程

spring boot集成rabbitmq的實例教程

2021-01-29 11:10陳凡了 Java教程

這篇文章主要給大家介紹了關(guān)于spring boot集成rabbitmq的相關(guān)資料,springboot集成RabbitMQ非常簡單,文中通過示例代碼介紹的非常詳細,需要的朋友們可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。

一、RabbitMQ的介紹  

RabbitMQ是消息中間件的一種,消息中間件即分布式系統(tǒng)中完成消息的發(fā)送和接收的基礎(chǔ)軟件.這些軟件有很多,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現(xiàn)已經(jīng)轉(zhuǎn)讓給apache).

消息中間件的工作過程可以用生產(chǎn)者消費者模型來表示.即,生產(chǎn)者不斷的向消息隊列發(fā)送信息,而消費者從消息隊列中消費信息.具體過程如下:

spring boot集成rabbitmq的實例教程

從上圖可看出,對于消息隊列來說,生產(chǎn)者,消息隊列,消費者是最重要的三個概念,生產(chǎn)者發(fā)消息到消息隊列中去,消費者監(jiān)聽指定的消息隊列,并且當消息隊列收到消息之后,接收消息隊列傳來的消息,并且給予相應(yīng)的處理.消息隊列常用于分布式系統(tǒng)之間互相信息的傳遞.

對于RabbitMQ來說,除了這三個基本模塊以外,還添加了一個模塊,即交換機(Exchange).它使得生產(chǎn)者和消息隊列之間產(chǎn)生了隔離,生產(chǎn)者將消息發(fā)送給交換機,而交換機則根據(jù)調(diào)度策略把相應(yīng)的消息轉(zhuǎn)發(fā)給對應(yīng)的消息隊列.那么RabitMQ的工作流程如下所示:

spring boot集成rabbitmq的實例教程

緊接著說一下交換機.交換機的主要作用是接收相應(yīng)的消息并且綁定到指定的隊列.交換機有四種類型,分別為Direct,topic,headers,Fanout.

Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即創(chuàng)建消息隊列的時候,指定一個BindingKey.當發(fā)送者發(fā)送消息的時候,指定對應(yīng)的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被發(fā)送到該消息隊列中.

topic轉(zhuǎn)發(fā)信息主要是依據(jù)通配符,隊列和交換機的綁定主要是依據(jù)一種模式(通配符+字符串),而當發(fā)送消息的時候,只有指定的Key和該模式相匹配的時候,消息才會被發(fā)送到該消息隊列中.

headers也是根據(jù)一個規(guī)則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規(guī)則,而發(fā)送消息的時候也會指定一組鍵值對規(guī)則,當兩組鍵值對規(guī)則相匹配的時候,消息會被發(fā)送到匹配的消息隊列中.

Fanout是路由廣播的形式,將會把消息發(fā)給綁定它的全部隊列,即便設(shè)置了key,也會被忽略. 

概念:

  • 生產(chǎn)者 消息的產(chǎn)生方,負責將消息推送到消息隊列
  • 消費者 消息的最終接受方,負責監(jiān)聽隊列中的對應(yīng)消息,消費消息
  • 隊列 消息的寄存器,負責存放生產(chǎn)者發(fā)送的消息
  • 交換機 負責根據(jù)一定規(guī)則分發(fā)生產(chǎn)者產(chǎn)生的消息
  • 綁定 完成交換機和隊列之間的綁定

模式:

1、direct

直連模式,用于實例間的任務(wù)分發(fā)

2、topic

話題模式,通過可配置的規(guī)則分發(fā)給綁定在該exchange上的隊列

3、headers

適用規(guī)則復(fù)雜的分發(fā),用headers里的參數(shù)表達規(guī)則

4、fanout

分發(fā)給所有綁定到該exchange上的隊列,忽略routing key

安裝

單機版安裝很簡單,大概步驟如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 安裝erlang包
 yum install erlang
# 安裝socat
 yum install socat
# 安裝rabbit
 rpm -ivh rabbitmq-server-3.6.6-1.el6.noarch.rpm
# 啟動服務(wù)
 rabbitmq-server start
# 增加管理控制功能
 rabbitmq-plugins enable rabbitmq_management
# 增加用戶:
 sudo rabbitmqctl add_user root password
 rabbitmqctl set_user_tags root administrator
 rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

集群安裝,可參考這篇文章:

     rabbitmq集群安裝

以上就是rabbitmq的介紹,下面開始本文的正文:spring boot 集成rabbitmq ,本人在學(xué)習(xí)rabbitmq時發(fā)現(xiàn)網(wǎng)上很少有系統(tǒng)性介紹springboot和rabbitmq如何集成的,其他人總結(jié)的都片段化,所以結(jié)合個人調(diào)研過程,整理此篇文章。

二、springboot配置

廢話少說直接上代碼:

配置參數(shù)

application.yml:

?
1
2
3
4
5
6
7
spring:
 rabbitmq:
 addresses: 192.168.1.1:5672
 username: username
 password: password
 publisher-confirms: true
 virtual-host: /

java config讀取參數(shù)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
 * RabbitMq配置文件讀取類
 *
 * @author chenhf
 * @create 2017-10-23 上午9:31
 **/
@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMqConfig {
 
 @Value("${spring.rabbitmq.addresses}")
 private String addresses;
 @Value("${spring.rabbitmq.username}")
 private String username;
 @Value("${spring.rabbitmq.password}")
 private String password;
 @Value("${spring.rabbitmq.publisher-confirms}")
 private Boolean publisherConfirms;
 @Value("${spring.rabbitmq.virtual-host}")
 private String virtualHost;
 
 // 構(gòu)建mq實例工廠
 @Bean
 public ConnectionFactory connectionFactory(){
 CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
 connectionFactory.setAddresses(addresses);
 connectionFactory.setUsername(username);
 connectionFactory.setPassword(password);
 connectionFactory.setPublisherConfirms(publisherConfirms);
 connectionFactory.setVirtualHost(virtualHost);
 return connectionFactory;
 }
 
 @Bean
 public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
 return new RabbitAdmin(connectionFactory);
 }
 
 @Bean
 @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
 public RabbitTemplate rabbitTemplate(){
 RabbitTemplate template = new RabbitTemplate(connectionFactory());
 return template;
 }
}

三、rabbitmq生產(chǎn)者配置

主要配置了直連和話題模式,其中話題模式設(shè)置兩個隊列(queueTopicTest1、queueTopicTest2),此兩個隊列在和交換機綁定時分別設(shè)置不同的routingkey(.TEST.以及l(fā)azy.#)來驗證匹配模式。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
 * 用于配置交換機和隊列對應(yīng)關(guān)系
 * 新增消息隊列應(yīng)該按照如下步驟
 * 1、增加queue bean,參見queueXXXX方法
 * 2、增加queue和exchange的binding
 * @author chenhf
 * @create 2017-10-23 上午10:33
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class RabbitMqExchangeConfig {
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqExchangeConfig.class);
 
 /**
 * @Author:chenhf
 * @Description: 主題型交換機
 * @Date:下午5:49 2017/10/23
 * @param
 * @return
 */
 @Bean
 TopicExchange contractTopicExchangeDurable(RabbitAdmin rabbitAdmin){
 TopicExchange contractTopicExchange = new TopicExchange(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode());
 rabbitAdmin.declareExchange(contractTopicExchange);
 logger.debug("完成主題型交換機bean實例化");
 return contractTopicExchange;
 }
 /**
 * 直連型交換機
 */
 @Bean
 DirectExchange contractDirectExchange(RabbitAdmin rabbitAdmin) {
 DirectExchange contractDirectExchange = new DirectExchange(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode());
 rabbitAdmin.declareExchange(contractDirectExchange);
 logger.debug("完成直連型交換機bean實例化");
 return contractDirectExchange;
 }
 
 //在此可以定義隊列
 
 @Bean
 Queue queueTest(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TESTQUEUE.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("測試隊列實例化完成");
 return queue;
 }
 
 //topic 1
 @Bean
 Queue queueTopicTest1(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST1.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測試隊列1實例化完成");
 return queue;
 }
 //topic 2
 @Bean
 Queue queueTopicTest2(RabbitAdmin rabbitAdmin){
 Queue queue = new Queue(RabbitMqEnum.QueueName.TOPICTEST2.getCode());
 rabbitAdmin.declareQueue(queue);
 logger.debug("話題測試隊列2實例化完成");
 return queue;
 }
 
 
 //在此處完成隊列和交換機綁定
 @Bean
 Binding bindingQueueTest(Queue queueTest,DirectExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTest).to(exchange).with(RabbitMqEnum.QueueEnum.TESTQUEUE.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測試隊列與直連型交換機綁定完成");
 return binding;
 }
 //topic binding1
 @Bean
 Binding bindingQueueTopicTest1(Queue queueTopicTest1,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest1).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE1.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測試隊列與話題交換機1綁定完成");
 return binding;
 }
 
 //topic binding2
 @Bean
 Binding bindingQueueTopicTest2(Queue queueTopicTest2,TopicExchange exchange,RabbitAdmin rabbitAdmin){
 Binding binding = BindingBuilder.bind(queueTopicTest2).to(exchange).with(RabbitMqEnum.QueueEnum.TESTTOPICQUEUE2.getCode());
 rabbitAdmin.declareBinding(binding);
 logger.debug("測試隊列與話題交換機2綁定完成");
 return binding;
 }
 
}

在這里用到枚舉類:RabbitMqEnum

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
 * 定義rabbitMq需要的常量
 *
 * @author chenhf
 * @create 2017-10-23 下午4:07
 **/
public class RabbitMqEnum {
 
 /**
 * @param
 * @Author:chenhf
 * @Description:定義數(shù)據(jù)交換方式
 * @Date:下午4:08 2017/10/23
 * @return
 */
 public enum Exchange {
 CONTRACT_FANOUT("CONTRACT_FANOUT", "消息分發(fā)"),
 CONTRACT_TOPIC("CONTRACT_TOPIC", "消息訂閱"),
 CONTRACT_DIRECT("CONTRACT_DIRECT", "點對點");
 
 private String code;
 private String name;
 
 Exchange(String code, String name) {
 this.code = code;
 this.name = name;
 }
 
 public String getCode() {
 return code;
 }
 
 public String getName() {
 return name;
 }
 }
 
 /**
 * describe: 定義隊列名稱
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueName {
 TESTQUEUE("TESTQUEUE", "測試隊列"),
 TOPICTEST1("TOPICTEST1", "topic測試隊列"),
 TOPICTEST2("TOPICTEST2", "topic測試隊列");
 
 private String code;
 private String name;
 
 QueueName(String code, String name) {
 this.code = code;
 this.name = name;
 }
 
 public String getCode() {
 return code;
 }
 
 public String getName() {
 return name;
 }
 
 }
 
 /**
 * describe: 定義routing_key
 * creat_user: chenhf
 * creat_date: 2017/10/31
 **/
 public enum QueueEnum {
 TESTQUEUE("TESTQUEUE1", "測試隊列key"),
 TESTTOPICQUEUE1("*.TEST.*", "topic測試隊列key"),
 TESTTOPICQUEUE2("lazy.#", "topic測試隊列key");
 
 
 private String code;
 private String name;
 
 QueueEnum(String code, String name) {
 this.code = code;
 this.name = name;
 }
 
 public String getCode() {
 return code;
 }
 
 public String getName() {
 return name;
 }
 }
 
}

以上完成消息生產(chǎn)者的定義,下面封裝調(diào)用接口

測試時直接調(diào)用此工具類,testUser類需自己實現(xiàn)

?
1
2
3
rabbitMqSender.sendRabbitmqDirect("TESTQUEUE1",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.1.2",testUser);
rabbitMqSender.sendRabbitmqTopic("lazy.TEST.2",testUser);
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
 * rabbitmq發(fā)送消息工具類
 *
 * @author chenhf
 * @create 2017-10-26 上午11:10
 **/
 
@Component
public class RabbitMqSender implements RabbitTemplate.ConfirmCallback{
 /** logger */
 private static final Logger logger = LoggerFactory.getLogger(RabbitMqSender.class);
 
 private RabbitTemplate rabbitTemplate;
 
 @Autowired
 public RabbitMqSender(RabbitTemplate rabbitTemplate) {
 this.rabbitTemplate = rabbitTemplate;
 this.rabbitTemplate.setConfirmCallback(this);
 }
 
 @Override
 public void confirm(CorrelationData correlationData, boolean b, String s) {
 logger.info("confirm: " + correlationData.getId());
 }
 
 /**
 * 發(fā)送到 指定routekey的指定queue
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqDirect(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_DIRECT.getCode(), routeKey , obj, correlationData);
 }
 
 /**
 * 所有發(fā)送到Topic Exchange的消息被轉(zhuǎn)發(fā)到所有關(guān)心RouteKey中指定Topic的Queue上
 * @param routeKey
 * @param obj
 */
 public void sendRabbitmqTopic(String routeKey,Object obj) {
 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
 logger.info("send: " + correlationData.getId());
 this.rabbitTemplate.convertAndSend(RabbitMqEnum.Exchange.CONTRACT_TOPIC.getCode(), routeKey , obj, correlationData);
 }
}

四、rabbitmq消費者配置

springboot注解方式監(jiān)聽隊列,無法手動指定回調(diào),所以采用了實現(xiàn)ChannelAwareMessageListener接口,重寫onMessage來進行手動回調(diào),詳見以下代碼,詳細介紹可以在spring的官網(wǎng)上找amqp相關(guān)章節(jié)閱讀

直連消費者

通過設(shè)置TestUser的name來測試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺中隊列中消息是否被消費

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class ExampleAmqpConfiguration {
 @Bean("testQueueContainer")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TESTQUEUE");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }
 
 
 @Bean("testQueueListener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 //通過設(shè)置TestUser的name來測試回調(diào),分別發(fā)兩條消息,一條UserName為1,一條為2,查看控制臺中隊列中消息是否被消費
 if ("2".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 }
 
 if ("1".equals(testUser.getUserName())){
  System.out.println(testUser.toString());
  channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
 }
 
 }
 };
 }
 
}

topic消費者1

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration {
 @Bean("topicTest1Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST1");
 container.setMessageListener(exampleListener1());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }
 
 
 @Bean("topicTest1Listener")
 public ChannelAwareMessageListener exampleListener1(){
 return new ChannelAwareMessageListener() {
 @Override
 public void onMessage(Message message, Channel channel) throws Exception {
 TestUser testUser = (TestUser) SerializeUtil.unserialize(message.getBody());
 System.out.println("TOPICTEST1:"+testUser.toString());
 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
 
 }
 };
 }
 
 
 
 
}

topic消費者2

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
 * 消費者配置
 *
 * @author chenhf
 * @create 2017-10-30 下午3:14
 **/
@Configuration
@AutoConfigureAfter(RabbitMqConfig.class)
public class TopicAmqpConfiguration2 {
 @Bean("topicTest2Container")
 public MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames("TOPICTEST2");
 container.setMessageListener(exampleListener());
 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 return container;
 }
 
 
 @Bean("topicTest2Listener")
 public ChannelAwareMessageListener exampleListener() {
 return new ChannelAwareMessageListener() {
 @Override
 public void

總結(jié)

以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,如果有疑問大家可以留言交流,謝謝大家對服務(wù)器之家的支持。

原文鏈接:https://segmentfault.com/a/1190000011797667

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 国产91精品在线播放 | 亚洲人成在线观看一区二区 | 99久在线| 糖心vlog麻豆精东影业传媒 | 久草在线福利视频在线播放 | 114级毛片免费观看 1024亚洲天堂 | 电车痴汉中文字幕 | 日本一区二区免费在线观看 | 9久re在线观看视频精品 | 精品亚洲欧美中文字幕在线看 | 国产欧美一区二区精品性色 | 国产精品高清一区二区三区 | 亚洲日本在线观看网址 | 国产另类视频 | 亚洲高清国产拍精品动图 | 干露露视频 性感写真 | 午夜爱爱爱爱爽爽爽视频网站 | 亚洲羞羞裸色私人影院 | 无遮挡免费h肉动漫在线观看 | 日韩精品福利视频一区二区三区 | 免费在线观看网址入口 | 国产成人影院一区二区 | 貂蝉沦为姓奴小说 | 久久一本岛在免费线观看2020 | 504神宫寺奈绪大战黑人 | 亚洲国产经典 | 手机在线免费观看日本推理片 | 亚洲视频一区在线播放 | 欧美日韩在线成人看片a | 久久精品热在线观看30 | 亚洲一区二区三区在线播放 | 女主被男主为催奶药h | 国产免费久久精品 | 性妲己| 精品国语对白精品自拍视 | 国产成人久久精品一区二区三区 | 韩国理论片最新第一页 | 猫咪av| videos变态极端 | 国产免费一区二区三区免费视频 | 波多野结衣178部中文字幕 |