rocketmq怎么保证数据不会重复_RocketMQ保证信息有序性和防止重复
分布式開放消息系統(RocketMQ)的原理與實踐
分布式消息系統做為實現分布式系統可擴展、可伸縮性的關鍵組件,須要具備高吞吐量、高可用等特色。而談到消息系統的設計,就回避不了兩個問題:java
消息的順序問題
消息的重復問題
RocketMQ做為阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?web
關鍵特性以及其實現原理
1、順序消息
消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條消息,分別是訂單建立、訂單付款、訂單完成。消費時,要按照這個順序消費才有意義。但同時訂單之間又是能夠并行消費的。算法
假如生產者產生了2條消息:M一、M2,要保證這兩條消息的順序,應該怎樣作?你腦中想到的多是這樣:服務器
你可能會采用這種方式保證消息順序網絡
M1發送到S1后,M2發送到S2,若是要保證M1先于M2被消費,那么須要M1到達消費端后,通知S2,而后S2再將M2發送到消費端。負載均衡
這個模型存在的問題是,若是M1和M2分別發送到兩臺Server上,就不能保證M1先達到,也就不能保證M1被先消費,那么就須要在MQ Server集群維護消息的順序。那么如何解決?一種簡單的方式就是將M一、M2發送到同一個Server上:分布式
保證消息順序,你改進后的方法ide
這樣能夠保證M1先于M2到達MQServer(客戶端等待M1成功后再發送M2),根據先達到先被消費的原則,M1會先于M2被消費,這樣就保證了消息的順序。svg
這個模型,理論上能夠保證消息的順序,但在實際運用中你應該會遇到下面的問題:性能
網絡延遲問題
只要將消息從一臺服務器發往另外一臺服務器,就會存在網絡延遲問題。如上圖所示,若是發送M1耗時大于發送M2的耗時,那么M2就先被消費,仍然不能保證消息的順序。即便M1和M2同時到達消費端,因為不清楚消費端1和消費端2的負載狀況,仍然有可能出現M2先于M1被消費。如何解決這個問題?將M1和M2發往同一個消費者便可,且發送M1后,須要消費端響應成功后才能發送M2。
但又會引入另一個問題,若是發送M1后,消費端1沒有響應,那是繼續發送M2呢,仍是從新發送M1?通常為了保證消息必定被消費,確定會選擇重發M1到另一個消費端2,就以下圖所示。
保證消息順序的正確姿式
這樣的模型就嚴格保證消息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種狀況,一種是M1確實沒有到達,另一種狀況是消費端1已經響應,可是Server端沒有收到。若是是第二種狀況,重發M1,就會形成M1被重復消費。也就是咱們后面要說的第二個問題,消息重復問題。
回過頭來看消息順序問題,嚴格的順序消息很是容易理解,并且處理問題也比較容易,要實現嚴格的順序消息,簡單且可行的辦法就是:
保證生產者 - MQServer - 消費者是一對一對一的關系
可是這樣設計,并行度就成為了消息系統的瓶頸(吞吐量不夠),也會致使更多的異常處理,好比:只要消費端出現問題,就會致使整個處理流程阻塞,咱們不得不花費更多的精力來解決阻塞的問題。
但咱們的最終目標是要集群的高容錯性和高吞吐量。這彷佛是一對不可調和的矛盾,那么阿里是如何解決的?
世界上解決一個計算機問題最簡單的方法:“剛好”不須要解決它!
有些問題,看起來很重要,但實際上咱們能夠經過合理的設計或者將問題分解來規避。若是硬要把時間花在解決它們身上,其實是浪費的,效率低下的。從這個角度來看消息的順序問題,咱們能夠得出兩個結論:
一、不關注亂序的應用實際大量存在
二、隊列無序并不意味著消息無序
最后咱們從源碼角度分析RocketMQ怎么實現發送順序消息。
通常消息是經過輪詢全部隊列來發送的(負載均衡策略),順序消息能夠根據業務,好比說訂單號相同的消息發送到同一個隊列。下面的示例中,OrderId相同的消息,會發送到同一個隊列:
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
在獲取到路由信息之后,會根據MessageQueueSelector實現的算法來選擇一個隊列,同一個OrderId獲取到的隊列是同一個隊列。
private SendResult send() {
// 獲取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根據咱們的算法,選擇一個發送隊列
// 這里的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}
2、消息重復
上面在解決消息順序問題時,引入了一個新的問題,就是消息重復。那么RocketMQ是怎樣解決消息重復的問題呢?仍是“剛好”不解決。
形成消息的重復的根本緣由是:網絡不可達。只要經過網絡交換數據,就沒法避免這個問題。因此解決這個問題的辦法就是不解決,轉而繞過這個問題。那么問題就變成了:若是消費端收到兩條同樣的消息,應該怎樣處理?
一、消費端處理消息的業務邏輯保持冪等性
二、保證每條消息都有惟一編號且保證消息處理成功與去重表的日志同時出現
第1條很好理解,只要保持冪等性,無論來多少條重復消息,最后處理的結果都同樣。第2條原理就是利用一張日志表來記錄已經處理成功的消息的ID,若是新到的消息ID已經在日志表中,那么就再也不處理這條消息。
咱們能夠看到第1條的解決方式,很明顯應該在消費端實現,不屬于消息系統要實現的功能。第2條能夠消息系統實現,也能夠業務端實現。正常狀況下出現重復消息的幾率不必定大,且由消息系統實現的話,確定會對消息系統的吞吐量和高可用有影響,因此最好仍是由業務端本身處理消息重復的問題,這也是RocketMQ不解決消息重復的問題的緣由。
RocketMQ不保證消息不重復,若是你的業務須要保證嚴格的不重復消息,須要你本身在業務端去重。
總結
以上是生活随笔為你收集整理的rocketmq怎么保证数据不会重复_RocketMQ保证信息有序性和防止重复的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设树采用孩子兄弟表示法存放.用类c语言设
- 下一篇: java记录代码执行位置_记录执行的ja