一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Spring Boot RabbitMQ 延遲消息實現(xiàn)完整版示例

Spring Boot RabbitMQ 延遲消息實現(xiàn)完整版示例

2021-04-25 13:20Sam哥哥 Java教程

本篇文章主要介紹了Spring Boot RabbitMQ 延遲消息實現(xiàn)完整版示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

概述

曾經(jīng)去網(wǎng)易面試的時候,面試官問了我一個問題,說

下完訂單后,如果用戶未支付,需要取消訂單,可以怎么做

我當時的回答是,用定時任務掃描db表即可。面試官不是很滿意,提出:

用定時任務無法做到準實時通知,有沒有其他辦法?

我當時的回答是:

可以用隊列,訂單下完后,發(fā)送一個消息到隊列里,并指定過期時間,時間一到,執(zhí)行回調(diào)接口。

面試官聽完后,就不再問了。其實我當時的思路是對的,只不過講的不是很專業(yè)而已。專業(yè)說法是利用 延遲消息 。

其實用定時任務,確實有點問題,原本業(yè)務系統(tǒng)希望10分鐘后,如果訂單未支付,就馬上取消訂單,并釋放商品庫存。但是一旦數(shù)據(jù)量大的話,就會加長獲取未支付訂單數(shù)據(jù)的時間,部分訂單就做不到10分鐘后取消了,可能是15分鐘,20分鐘之類的。這樣的話,庫存就無法及時得到釋放,也就會影響成單數(shù)。而利用延遲消息,則理論上是可以做到按照設定的時間,進行訂單取消操作的。

目前網(wǎng)上關于使用rabbitmq實現(xiàn)延遲消息的文章,大多都是講如何利用rabbitmq的死信隊列來實現(xiàn),實現(xiàn)方案看起來都很繁瑣復雜,并且還是使用原始的rabbitmq client api來實現(xiàn)的,更加顯得啰嗦。

spring boot 已經(jīng)對rabbitmq client api進行了包裝,使用起來簡潔很多,下面詳細介紹一下如何利用 rabbitmq_delayed_message_exchange 插件和spring boot來實現(xiàn)延遲消息。

軟件準備

erlang

本文使用的版本是:erlang 20.3

rabbitmq

本文使用的是 window 版本的rabbitmq,版本號是:3.7.4

rabbitmq_delayed_message_exchange插件

插件下載地址:http://www.rabbitmq.com/community-plugins.html

打開網(wǎng)址后,ctrl + f,搜索 rabbitmq_delayed_message_exchange 。

Spring Boot RabbitMQ 延遲消息實現(xiàn)完整版示例

千萬記住,一定選好版本號,由于我使用的是rabbitmq 3.7.4,因此對應的 rabbitmq_delayed_message_exchange 插件也必須選擇3.7.x的。

如果沒有選對版本,在使用延遲消息的時候,會遇到各種各樣的奇葩問題,而且網(wǎng)上還找不到解決方案。我因為這個問題,折騰了整整一個晚上。請牢記,要選對插件版本。

下載完插件后,將其放置到rabbitmq安裝目錄下的 plugins 目錄下,并使用如下命令啟動這個插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

如果啟動成功會出現(xiàn)如下信息:

the following plugins have been enabled:   rabbitmq_delayed_message_exchange

啟動插件成功后,記得重啟一下rabbitmq,讓其生效。

集成rabbitmq

這個就非常簡單了,直接在maven工程的pom.xml文件中加入

?
1
2
3
4
<dependency>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

spring boot的版本我使用的是 2.0.1.release .

接下來在 application.properties 文件中加入redis配置:

?
1
2
3
4
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

定義connectionfactory和rabbittemplate

也很簡單,代碼如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.mq.rabbitmq;
 
import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
 
@configuration
@configurationproperties(prefix = "spring.rabbitmq")
public class rabbitmqconfig {
  private string host;
  private int port;
  private string username;
  private string password;
 
  @bean
  public connectionfactory connectionfactory() {
    cachingconnectionfactory cachingconnectionfactory = new cachingconnectionfactory(host,port);
    cachingconnectionfactory.setusername(username);
    cachingconnectionfactory.setpassword(password);
    cachingconnectionfactory.setvirtualhost("/");
    cachingconnectionfactory.setpublisherconfirms(true);
    return cachingconnectionfactory;
  }
 
  @bean
  public rabbittemplate rabbittemplate() {
    rabbittemplate rabbittemplate = new rabbittemplate(connectionfactory());
    return rabbittemplate;
  }
 
  public string gethost() {
    return host;
  }
 
  public void sethost(string host) {
    this.host = host;
  }
 
  public int getport() {
    return port;
  }
 
  public void setport(int port) {
    this.port = port;
  }
 
  public string getusername() {
    return username;
  }
 
  public void setusername(string username) {
    this.username = username;
  }
 
  public string getpassword() {
    return password;
  }
 
  public void setpassword(string password) {
    this.password = password;
  }
}

exchange和queue配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.mq.rabbitmq;
 
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
 
import java.util.hashmap;
import java.util.map;
 
@configuration
public class queueconfig {
 
  @bean
  public customexchange delayexchange() {
    map<string, object> args = new hashmap<>();
    args.put("x-delayed-type", "direct");
    return new customexchange("test_exchange", "x-delayed-message",true, false,args);
  }
 
  @bean
  public queue queue() {
    queue queue = new queue("test_queue_1", true);
    return queue;
  }
 
  @bean
  public binding binding() {
    return bindingbuilder.bind(queue()).to(delayexchange()).with("test_queue_1").noargs();
  }
}

這里要特別注意的是,使用的是 customexchange ,不是 directexchange ,另外 customexchange 的類型必須是 x-delayed-message 。

實現(xiàn)消息發(fā)送

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.mq.rabbitmq;
 
import org.springframework.amqp.amqpexception;
import org.springframework.amqp.core.message;
import org.springframework.amqp.core.messagepostprocessor;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.stereotype.service;
 
import java.text.simpledateformat;
import java.util.date;
 
@service
public class messageserviceimpl {
 
  @autowired
  private rabbittemplate rabbittemplate;
 
  public void sendmsg(string queuename,string msg) {
    simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
    system.out.println("消息發(fā)送時間:"+sdf.format(new date()));
    rabbittemplate.convertandsend("test_exchange", queuename, msg, new messagepostprocessor() {
      @override
      public message postprocessmessage(message message) throws amqpexception {
        message.getmessageproperties().setheader("x-delay",3000);
        return message;
      }
    });
  }
}

注意在發(fā)送的時候,必須加上一個header

x-delay

在這里我設置的延遲時間是3秒。

消息消費者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package com.mq.rabbitmq;
import org.springframework.amqp.rabbit.annotation.rabbithandler;
import org.springframework.amqp.rabbit.annotation.rabbitlistener;
import org.springframework.stereotype.component;
 
import java.text.simpledateformat;
import java.util.date;
@component
public class messagereceiver {
 
  @rabbitlistener(queues = "test_queue_1")
  public void receive(string msg) {
    simpledateformat sdf = new simpledateformat("yyyy-mm-dd hh:mm:ss");
    system.out.println("消息接收時間:"+sdf.format(new date()));
    system.out.println("接收到的消息:"+msg);
  }
}

運行spring boot程序和發(fā)送消息

直接在main方法里運行spring boot程序,spring boot會自動解析 messagereceiver 類的。

接下來只需要用junit運行一下發(fā)送消息的接口即可。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.mq.rabbitmq;
import org.junit.test;
import org.junit.runner.runwith;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.boot.test.context.springboottest;
import org.springframework.test.context.junit4.springrunner;
@runwith(springrunner.class)
@springboottest
public class rabbitmqapplicationtests {
  @autowired
  private messageserviceimpl messageservice;
  @test
  public void send() {
    messageservice.sendmsg("test_queue_1","hello i am delay msg");
  }
}

運行完后,可以看到如下信息:

消息發(fā)送時間:2018-05-03 12:44:53
3秒鐘后,spring boot控制臺會輸出:
消息接收時間:2018-05-03 12:44:56
接收到的消息:hello i am delay msg

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:https://blog.csdn.net/linsongbin1/article/details/80178122

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 99爱爱| 性色AV一区二区三区V视界影院 | 亚洲老头老太hd | 黄a级| 欧美三级不卡在线观线看高清 | 日韩一区在线播放 | 秒播影视 午夜福利毛片 | 九色PORNY真实丨国产大胸 | 欧美亚洲桃花综合 | 91在线精品老司机免费播放 | 农村妇女野战bbxxx农村妇女 | 金牛网155755水心论坛黄大父母 | 免费精品国产 | 欧美日韩国产另类一区二区三区 | 国产精品制服丝袜白丝www | 国产成人精品本亚洲 | 久久精品熟女亚洲AV国产 | 日本一区二区高清免费不卡 | 亚洲 在线 日韩 欧美 | a级片欧美 | 欧美 国产 日韩 第一页 | 成人区精品一区二区毛片不卡 | 国产人人艹 | 亚欧有色在线观看免费版高清 | 欧美夜夜精品一级爽 | 日本人在线看片 | 无人区在线观看免费国语完整版 | 香蕉tv亚洲专区在线观看 | 91亚洲精品第一综合不卡播放 | bt7086新片速递亚洲最新合集 | 亚欧精品在线观看 | 2020年精品国产午夜福利在线 | 亚洲精品无码不卡在线观看 | 99视频久久精品久久 | 奇米影视小说 | 啊好大好粗 | 艾秋麻豆果冻剧传媒在线播放 | 国产综合成色在线视频 | 亚洲精品国产精品精 | 四虎在线免费播放 | jzz大全部 |