摘要
整合場(chǎng)景含 topic 工作模式(通過 routingKey 可滿足簡(jiǎn)單/工作隊(duì)列/發(fā)布訂閱/路由等四種工作模式)和 confirm(消息確認(rèn))、return(消息返回)、basicAck(消息簽收)、basicNack(拒絕簽收)、DLX(Dead Letter Exchange死信隊(duì)列)實(shí)現(xiàn)延時(shí)/定時(shí)任務(wù)等。
整合
依賴與配置
以下內(nèi)容消費(fèi)者同生產(chǎn)者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
< parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >2.1.4.RELEASE</ version > </ parent > < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-amqp</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > </ dependency > </ dependencies > |
1
2
3
4
5
6
7
8
9
|
server.port=8090 spring.rabbitmq.host=192.168.168.10 spring.rabbitmq.port=5672 spring.rabbitmq.username=zheng123 spring.rabbitmq.password=zheng123 spring.rabbitmq.virtual-host=/zheng spring.rabbitmq.publisher-confirms=true spring.rabbitmq.publisher-returns=true spring.rabbitmq.listener.direct.acknowledge-mode=manual |
生產(chǎn)者配置消息隊(duì)列規(guī)則
下邊是兩種配置方式,本次整合示例中使用第一個(gè)配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
@Configuration public class TopicConfig { // 聲明隊(duì)列 @Bean public Queue topicQ1() { return new Queue( "topic_sb_mq_q1" ); } // 聲明隊(duì)列并綁定該隊(duì)列到死信交換機(jī)(返回值有兩種寫法,任選一種都可以) // 測(cè)試死信需要關(guān)閉原隊(duì)列的監(jiān)聽 @Bean public Queue topicQ2() { return QueueBuilder.durable( "topic_sb_mq_q2" ) .withArgument( "x-dead-letter-exchange" , "topicExchange" ) .withArgument( "x-dead-letter-routing-key" , "changsha.f" ) .withArgument( "x-message-ttl" , 10000 ) .build(); Map<String,Object> arguments = new HashMap<>( 2 ); arguments.put( "x-dead-letter-exchange" , "topicExchange" ); arguments.put( "x-dead-letter-routing-key" , "changsha.f" ); arguments.put( "x-message-ttl" , 10000 ); return new Queue( "topic_sb_mq_q2" , true , false , false ,arguments); } //聲明exchange @Bean public TopicExchange setTopicExchange() { return new TopicExchange( "topicExchange" ); } //聲明binding,需要聲明一個(gè)routingKey @Bean public Binding bindTopicHebei1() { return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with( "changsha.*" ); } @Bean public Binding bindTopicHebei2() { return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with( "#.beijing" ); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
@Configuration public class RabbitMqConfig { //定義交換機(jī)的名字 public static final String EXCHANGE_NAME = "boot_topic_exchange" ; //定義隊(duì)列的名字 public static final String QUEUE_NAME = "boot_queue" ; //1、聲明交換機(jī) @Bean ( "bootExchange" ) public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable( true ).build(); } //2、聲明隊(duì)列 @Bean ( "bootQueue" ) public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //3、隊(duì)列與交換機(jī)進(jìn)行綁定 @Bean public Binding bindQueueExchange( @Qualifier ( "bootQueue" ) Queue queue, @Qualifier ( "bootExchange" ) Exchange exchange){ // topic模式兼容廣播模式,路由模式。with("#")則類似廣播模式匹配所有訂閱者;with("boot.1")則類似路由模式匹配指定訂閱者 return BindingBuilder.bind(queue).to(exchange).with( "boot.#" ).noargs(); } } |
生產(chǎn)者發(fā)布消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping (value= "/topicSend" ) public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException { // 定義 confirm 回調(diào) rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { // confirmed } else { // nack-ed } }); rabbitTemplate.setMandatory( true ); rabbitTemplate.setReturnCallback((msg, replyCode, replyText, exchange, routKey)->{ // return message }); if ( null == routingKey) { routingKey= "changsha.kf" ; } MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN); //fanout模式只往exchange里發(fā)送消息。分發(fā)到exchange下的所有queue rabbitTemplate.send( "topicExchange" , routingKey, new Message(message.getBytes( "UTF-8" ),messageProperties)); return "message sended : routingKey >" +routingKey+ ";message > " +message; } } |
消費(fèi)者監(jiān)聽消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
@Component public class ConcumerReceiver { //topic 模式 //注意這個(gè)模式會(huì)有優(yōu)先匹配原則。例如發(fā)送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不會(huì)再去匹配*.ITd @RabbitListener (queues= "topic_sb_mq_q1" ) public void topicReceiveq1(String msg,Message message, Channel channel) throws IOException { // 消息id long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // message.getBody() todosomething // 簽收消息 channel.basicAck(deliveryTag, true ); } catch (Exception e) { // 拒絕簽收 // 第三個(gè)參數(shù):requeue:重回隊(duì)列。如果設(shè)置為true,則消息重新回到queue,broker會(huì)重新發(fā)送該消息給消費(fèi)端 channel.basicNack(deliveryTag, true , true ); } } @RabbitListener (queues= "topic_sb_mq_q2" ) public void topicReceiveq2(String message) { System.out.println( "Topic模式 topic_sb_mq_q2 received message : " +message); } } |
到此這篇關(guān)于SpringBoot整合RabbitMQ及生產(chǎn)全場(chǎng)景高級(jí)特性實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)SpringBoot整合RabbitMQ內(nèi)容請(qǐng)搜索服務(wù)器之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持服務(wù)器之家!
原文鏈接:https://juejin.cn/post/7007641887821529119