前言
消息隊列RocketMQ版是阿里云基于Apache RocketMQ構(gòu)建的低延遲、高并發(fā)、高可用、高可靠的分布式消息中間件。
看過我之前幾篇文章的應(yīng)該都大概隊消息隊列有個概念了,都明白了,那這個消息從何而來呢?
所謂黃河之水天上來,大自然間每一個事物都不是平白無故來的吧?????怎么來的,????它母親生產(chǎn)的;香奈兒????怎么來的,機(jī)器加原料生產(chǎn)的;就連平時吃的大米,也是有出處的;咱們是怎么來的,咱們當(dāng)然是偉大的母親生產(chǎn)下來的了
順便感謝一下偉大的母親,周日記得給她打個電話哦
下面進(jìn)入主題,這是分割線
消息隊列RocketMQ版既可為分布式應(yīng)用系統(tǒng)提供異步解耦和削峰填谷的能力,同時也具備互聯(lián)網(wǎng)應(yīng)用所需的海量消息堆積、高吞吐、可靠重試等特性。下面列舉了一些特點
- 消息查詢:消息隊列RocketMQ版提供了三種消息查詢的方式,分別是按Message ID、Message Key以及Topic查詢
- 查詢消息軌跡:通過消息軌跡,能清晰定位消息從生產(chǎn)者發(fā)出,經(jīng)由消息隊列RocketMQ版服務(wù)端,投遞給消息消費者的完整鏈路,方便定位排查問題
- 集群消費和廣播消費:當(dāng)使用集群消費模式時,消息隊列RocketMQ版認(rèn)為任意一條消息只需要被消費者集群內(nèi)的任意一個消費者處理即可;當(dāng)使用廣播消費模式時,消息隊列RocketMQ版會將每條消息推送給消費者集群內(nèi)所有注冊過的消費者,保證消息至少被每臺機(jī)器消費一次
- 重置消費位點:根據(jù)時間或位點重置消費進(jìn)度,允許用戶進(jìn)行消息回溯或者丟棄堆積消息
- 死信隊列:將無法正常消費的消息儲存到特殊的死信隊列供后續(xù)處理
- 全球信息路由:用于全球不同地域之間的消息同步,保證地域之間的數(shù)據(jù)一致性
客戶端,其實很容易理解了,我們可以把RocketMQ理解成一個消息服務(wù),既然是一個服務(wù),我們就需要調(diào)用這個服務(wù),那么調(diào)用這個服務(wù)的時候,這個消息從哪里來,這個就是要根據(jù)業(yè)務(wù)場景來定了,所以啊,消息的生產(chǎn)者Producer屬于一個客戶端;消息產(chǎn)生了,總不能一直放著吧,總要有人處理掉這些消息吧,這也是業(yè)務(wù)決定的,所以消息的消費者consumer也是屬于客戶端。
下面啊,大魚就帶著大家一起來看看這客戶端的用處
生產(chǎn)者Producer
生產(chǎn)者Producer,顧名思義,就是負(fù)責(zé)生產(chǎn)消息的,此時大家應(yīng)該腦子有很多問號才對,比如Producer發(fā)消息發(fā)到哪里了,流程是怎么樣的,發(fā)的消息都是什么類型的等等這些,這些問題搞懂了的話,Producer這個客戶端基本就搞定了
魚魚教大家一個小技巧,學(xué)習(xí)一個東西,先搞懂大體流程,再拆分而細(xì)攻之,最后再統(tǒng)籌理解,這樣效果會很好,獨家秘方
接下來我從消息是如何發(fā)送的(負(fù)載均衡、容錯機(jī)制)、消息發(fā)給誰和存儲到哪里、消息的類型三方面來介紹Producer
1、消息是如何發(fā)送的?
首先,消息總不能產(chǎn)生了哪里也不去吧,那產(chǎn)生這個消息就沒有任何意義了,所以這個消息總要發(fā)送到一個地方去,接力傳遞,看下面這個圖
Producer會首先從本地緩存中獲取到指定的Topic,如果找到就直接根據(jù)這個Topic發(fā)送產(chǎn)生的消息,緩存大家都明白啊,就是為了優(yōu)化速度,減少網(wǎng)絡(luò)傳輸。
沒有的話,就要去NameServer獲取最新的Topic列表(這個是Broker啟動的時候注冊到NameServer上的),通過一定的策略選擇一個MessageQueue隊列,獲取這個mq所在的Broker地址,也是先從本地緩存中獲取,如果獲取不到則請求NameServer獲取(NameServer中也同樣注冊了Broker地址和Topic的映射關(guān)系),進(jìn)行發(fā)送消息
發(fā)送失敗的話,會有重試機(jī)制,默認(rèn)是重試三次
其實保存這么多,既能減少和NameServer之間的網(wǎng)絡(luò)傳輸,又能減小NameServer的壓力,NameServer本身就是屬于輕量級的設(shè)計,這樣也有利于減輕NameServer的壓力,NameServer我也會單獨寫一篇來介紹
負(fù)載均衡
我們知道消息發(fā)送的時候會首先選擇一個對應(yīng)的Topic,每個Topic會對應(yīng)多個MessageQueue,這樣就有一個問題,發(fā)消息的時候要是做不到雨露均沾,可能就會有的隊列多,有的隊列少這樣的問題,就會造成資源的浪費
RocketMQ采用了樸素的方式,沒錯,就是輪詢,高端的食材往往只需要最樸素的烹飪方式~
生產(chǎn)者通過輪詢某個 Topic 下的所有 MessageQueue 的方式來實現(xiàn)發(fā)送方的負(fù)載均衡,簡單來說就是人人都有份,如下圖:
通過這種方式,可以將一個 Topic 的消息分散到多個 MessageQueue 上,進(jìn)而分散到多個 Broker 上。
發(fā)送消息的容錯機(jī)制:
Producer 作為發(fā)送消息的一方,有3種容錯機(jī)制:
- 本地緩存:把從 NameSever 獲取的信息緩存到本地,以防 NameSever 宕機(jī)
- 不可用Broker集合:Producer有一個 Broker 的容錯機(jī)制,開關(guān)sendLatencyFaultEnable可以開啟,RocketMq內(nèi)部會維護(hù)一個故障Broker的HashMap,把一定延遲級別的Broker放入這個map,下次選擇Broker的時候,就會規(guī)避不可用的Broker。
- 重試:Producer發(fā)送消息時,有一個重試機(jī)制,默認(rèn)重試3次。死信隊列 Consumer消費重試超過指定次數(shù),進(jìn)入死信隊列
通過這種方式,可以將一個 Topic 的消息分散到多個 MessageQueue 上,進(jìn)而分散到多個 Broker 上。
2、消息發(fā)給誰和存儲在哪里?
Producer連接NameSever
Producer 通過 NameSever 獲取指定 Topic 的 Broker 路由信息,并在本地保存一份緩存數(shù)據(jù),比如一個Topic有哪些 MessageQueue,MessageQueue 在哪幾臺 Broker 上,Broker 的ip.port等等。Producer 發(fā)送消息只發(fā)到 Master Broker上,Slave 通過主從同步獲取數(shù)據(jù)。
那么 Produce 是怎么連接NameSever 的呢
- 連接:單個生產(chǎn)者者和一臺 Nameserver 保持長連接,定時查詢topic配置信息,如果該nameserver掛掉,生產(chǎn)者會自動連接下一個nameserver,直到有可用連接為止,并能自動重連。
- 輪詢時間:默認(rèn)情況下,生產(chǎn)者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker如果宕機(jī),生產(chǎn)者最多要30秒才能感知,在此期間,發(fā)往該broker的消息發(fā)送失敗。該時間由DefaultMQProducer的pollNameServerInteval參數(shù)決定,可手動配置。
- 心跳:與nameserver沒有心跳
Producer連接Broker
- 連接:生產(chǎn)者 跟 Topic 涉及的所有Broker 保持長連接。
- 心跳:默認(rèn)情況下,生產(chǎn)者每隔30秒向所有broker發(fā)送心跳。broker每隔10秒鐘(此時間無法更改),掃描所有還存活的連接,若某個連接2分鐘內(nèi)(當(dāng)前時間與最后更新時間差值超過2分鐘,此時間無法更改)沒有發(fā)送心跳數(shù)據(jù),則關(guān)閉連接
Producer連接上Broker之后,消息會通過輪詢的方式發(fā)送到Broker上,并且存儲在Broker中的CommitLog中,這里面存儲的是原始消息,還有一個ConsumeQueue用于存儲投遞到某一個queue的消息的位置信息。當(dāng)然,消息隊列會持久化到磁盤中的,不影響內(nèi)存,當(dāng)然也會定期清理消息。
那消費完的消息去了哪里呢?什么時候清理物理消息文件呢?還有這樣設(shè)計的好處呢?
這些我們都留在下下一篇中,也就是Broker篇,讓你透徹了解Broker這個大腦是如何助力RocketMQ支持這么高的吞吐量的
總之啊,這個問題值得大家深入研究一下,如果再面試的時候,你不僅能說出RocketMQ的用處,你還能說出它的存儲原理和尋址原理,那面試官就愛上你了。此時你再拿出王炸,就是解決各種實際問題的能力,比如如何處理重復(fù)消息啊、如何保證消息的順序性啊、在分布式系統(tǒng)中如何保證分布式事務(wù)啊
面試官當(dāng)場給你發(fā)offer,say:How much money do you expect to work for us ?
3、消息的種類
RocketMQ種的消息種類大致可以分為四種:普通消息、定時和延時消息、順序消息、事務(wù)消息四種類型,這是重點!
簡單介紹下四種類型
- 普通消息:消息隊列RocketMQ版中無特性的消息,區(qū)別于有特性的定時和延時消息、順序消息和事務(wù)消息。
- 定時和延時消息:允許消息生產(chǎn)者對指定消息進(jìn)行定時(延時)投遞,最長支持40天。
- 順序消息:允許消息消費者按照消息發(fā)送的順序?qū)ο⑦M(jìn)行消費。
- 事務(wù)消息:實現(xiàn)類似X或Open XA的分布事務(wù)功能,以達(dá)到事務(wù)最終一致性狀態(tài)。
消息隊列RocketMQ提供的四種消息類型所對應(yīng)的Topic不能混用,例如,創(chuàng)建的普通消息的Topic只能用于收發(fā)普通消息,不能用于收發(fā)其他類型的消息;同理,事務(wù)消息的Topic也只能收發(fā)事務(wù)消息,不能用于收發(fā)其他類型的消息,以此類推
普通消息
普通消息:消息隊列RocketMQ中無特性的消息,區(qū)別于有特性的定時和延時消息、順序消息和事務(wù)消息
普通消息以三種發(fā)送方式:同步Sync發(fā)送、異步Async發(fā)送和單向Oneway發(fā)送
同步就是我們發(fā)送了消息之后必須等到服務(wù)器響應(yīng)之后才能發(fā)送下一個;異步適用于對時間較敏感的業(yè)務(wù)場景,異步不需要等待服務(wù)器的響應(yīng)就可以連續(xù)發(fā)送消息;單向則比異步用時更短,一般在微秒級別,但是可靠性會降低,因為只管發(fā)送,不等待服務(wù)器響應(yīng),也沒有回調(diào)函數(shù)觸發(fā)
同步發(fā)送
同步,消息發(fā)送方發(fā)出一條消息后,會在收到服務(wù)端返回響應(yīng)之后才發(fā)下一條消息的通訊方式
異步發(fā)送
異步發(fā)送是指發(fā)送方發(fā)出一條消息后,不等服務(wù)端返回響應(yīng),接著發(fā)送下一條消息的通訊方式
消息隊列RocketMQ版的異步發(fā)送,需要實現(xiàn)異步發(fā)送回調(diào)接口(SendCallback)。消息發(fā)送方在發(fā)送了一條消息后,不需要等待服務(wù)端響應(yīng)即可發(fā)送第二條消息。發(fā)送方通過回調(diào)接口接收服務(wù)端響應(yīng),并處理響應(yīng)結(jié)果
一般用于對時間較敏感的業(yè)務(wù)場景
單向發(fā)送
發(fā)送方只負(fù)責(zé)發(fā)送消息,不等待服務(wù)端返回響應(yīng)且沒有回調(diào)函數(shù)觸發(fā),即只發(fā)送請求不等待應(yīng)答。此方式發(fā)送消息的過程耗時非常短,一般在微秒級別
應(yīng)用于對可靠性要求并不高的場景,比如日志收集
定時和延時消息
定時和延時消息:允許消息生產(chǎn)者對指定消息進(jìn)行定時(延時)投遞,最長支持40天
延時消息用于指定消息發(fā)送到消息隊列RocketMQ版的服務(wù)端后,延時一段時間才被投遞到客戶端進(jìn)行消費(例如3秒后才被消費),適用于解決一些消息生產(chǎn)和消費有時間窗口要求的場景,或者通過消息觸發(fā)延遲任務(wù)的場景,類似于延遲隊列。
定時消息可以做到在指定時間戳之后才可被消費者消費,適用于對消息生產(chǎn)和消費有時間窗口要求,或者利用消息觸發(fā)定時任務(wù)的場景。
適用場景
通過消息來觸發(fā)一些定時任務(wù),這個時候這個定時消息就派上用場了,比如在某一時間向用戶發(fā)送的提醒消息;一些消息生產(chǎn)和消費之間有時間窗口,比如典型的電商里面的超時未支付關(guān)閉訂單的場景,這時延時消息就派上用場了,超時未完成支付就關(guān)閉訂單
定時消息的精度會有1s~2s的延遲誤差
其實定時消息和延時消息在使用的時候也是有一些差別的,用過的應(yīng)該都知道,給大家提一下,定時消息需要明確指定消息發(fā)送時間點之后的某一時間點作為消息投遞的時間點;延時消息則需要設(shè)定一個延時的時間長度,長度是固定的,但是時刻點不是固定,是根據(jù)發(fā)送消息的時間點有關(guān)的,消息將從當(dāng)前發(fā)送時間點開始延遲固定時間之后才開始投遞,這個大家應(yīng)該都很清楚了,淘寶下個單,給你留30分鐘時間支付,超時未支付則關(guān)閉訂單
順序消息
順序消息:允許消息消費者按照消息發(fā)送的順序進(jìn)行消息的發(fā)送
順序消息分為兩類:
- 全局順序:對于指定的一個Topic,所有消息按照嚴(yán)格的先入先出FIFO(First In First Out)的順序進(jìn)行發(fā)布和消費。
- 分區(qū)順序:對于指定的一個Topic,所有消息根據(jù)Sharding Key進(jìn)行區(qū)塊分區(qū)。同一個分區(qū)內(nèi)的消息按照嚴(yán)格的FIFO順序進(jìn)行發(fā)布和消費。Sharding Key是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的Key是完全不同的概念。
其實這也是個比較經(jīng)典的問題,面試也是比較常問的,就是如何保證順序性?魚魚反正會回答,你會嗎?
如果遇到這個問題,首先你要分情況說明,就是分為全局順序和分區(qū)順序這兩種情況:
1、全局順序適用于性能要求不高,所有的消息都要嚴(yán)格按照先進(jìn)先出的順序來發(fā)布和消費的場景。這種情況我也沒遇到過,一般也不太會使用全局有序這種
2、分區(qū)順序適用于性能要求比較高,以Sharding Key作為分區(qū)字段,在用一個區(qū)塊中嚴(yán)格按照先進(jìn)先出的順序發(fā)布和消費。比如用戶注冊的時候的驗證碼,以用戶ID作為Sharding Key,那么同一個用戶發(fā)送的消息都會按照發(fā)布的先后順序來消費,再比如就是電商中的訂單流程問題
阿里巴巴集團(tuán)內(nèi)部電商系統(tǒng)均使用分區(qū)順序消息,既保證業(yè)務(wù)的順序,同時又能保證業(yè)務(wù)的高性能。別問我怎么知道的,阿里云官網(wǎng)寫的
順序消息常見問題
為什么全局順序消息性能一般?
全局順序消息是嚴(yán)格按照FIFO的消息阻塞原則,即上一條消息沒有被成功消費,那么下一條消息會一直被存儲到Topic隊列中。如果想提高全局順序消息的TPS,可以升級實例配置,同時消息客戶端應(yīng)用盡量減少處理本地業(yè)務(wù)邏輯的耗時。
順序消息支持哪種消息發(fā)送方式?是否支持集群消費和廣播消費?
順序消息只支持可靠同步發(fā)送方式,不支持異步發(fā)送方式,否則將無法嚴(yán)格保證順序。順序消息暫時僅支持集群消費模式,不支持廣播消費模式。
事務(wù)消息
事務(wù)消息:實現(xiàn)類似X或者Open XA的分布式事務(wù)功能,以達(dá)到最終一致性
消息隊列RocketMQ版提供類似X或Open XA的分布式事務(wù)功能,通過消息隊列RocketMQ版事務(wù)消息,能達(dá)到分布式事務(wù)的最終一致。
半事務(wù)消息:暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊列RocketMQ版服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對該消息的二次確認(rèn),此時該消息被標(biāo)記成“暫不能投遞”狀態(tài),處于該種狀態(tài)下的消息即半事務(wù)消息。
消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確認(rèn)丟失,消息隊列RocketMQ版服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時,需要主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit或是Rollback),該詢問過程即消息回查。
跟小仙來看看事務(wù)消息發(fā)送步驟:
1、發(fā)送方將半事務(wù)消息發(fā)送到服務(wù)端Broker,服務(wù)端會將消息持久化,成功之后會返回ACK確認(rèn)消息已經(jīng)發(fā)送成功,此時消息為半事務(wù)消息
2、發(fā)送方開始執(zhí)行本地事務(wù)的邏輯
3、發(fā)送方會根據(jù)本地事務(wù)的執(zhí)行結(jié)果向服務(wù)端提交二次確認(rèn),決定Commit還是Rollback,服務(wù)端收到Commit之后則把這個消息標(biāo)記為可投遞,發(fā)送到消費方;服務(wù)端收到Rollback之后則刪除半事務(wù)消息,服務(wù)端不會發(fā)送,則消費方也不會收到
如可是如果斷網(wǎng)或者應(yīng)用重啟這些情況,上述的步驟的二次確認(rèn)信息無法到達(dá)服務(wù)端,怎么辦?
這里其實有個回查機(jī)制,發(fā)送方發(fā)送消息之后,需要本地執(zhí)行事務(wù),如果事務(wù)執(zhí)行的過程出現(xiàn)卡死的情況,或者事務(wù)執(zhí)行結(jié)果因為網(wǎng)絡(luò)等問題,無法傳遞事務(wù)結(jié)果到服務(wù)端,服務(wù)端會執(zhí)行一個回查機(jī)制,來確認(rèn)這個半事務(wù)消息的最終提交情況。
總結(jié)
消息隊列RocketMQ版的消費者和生產(chǎn)者客戶端對象是線程安全的,可以在多個線程之間共享使用。可以在服務(wù)器上(或者多臺服務(wù)器)部署多個生產(chǎn)者和消費者實例,也可以在同一個生產(chǎn)者或消費者實例里采用多線程發(fā)送或接收消息,從而提高消息發(fā)送或接收TPS。避免為每個線程創(chuàng)建一個客戶端實例。
好了,回顧一下本篇的內(nèi)容吧
1、消息發(fā)送的負(fù)載均衡、容錯機(jī)制
2、消息發(fā)送流程和存儲(具體如何存儲會在Broker篇說,因為這些東西都存儲在Broker的CommitLog和ConsumerQueue中了)
3、消息的類型:普通消息(同步發(fā)送、異步發(fā)送、單向發(fā)送)、定時和延時消息、順序消息(全局順序和部分順序)、事務(wù)消息
原文地址:https://mp.weixin.qq.com/s/hg04Q-jkopxbqOsX7sVhgA