分布式事务——消息最终一致性方案
前言
分布式事務(wù)一直是服務(wù)化拆分后一個(gè)繞不開的話題,原來在單體應(yīng)用中執(zhí)行的多個(gè)邏輯操作,現(xiàn)在被拆分成了多個(gè)服務(wù)之間的遠(yuǎn)程調(diào)用。雖然服務(wù)化為我們的系統(tǒng)帶來了水平伸縮的能力,然而隨之而來挑戰(zhàn)就是分布式事務(wù)問題,多個(gè)服務(wù)之間使用自己?jiǎn)为?dú)維護(hù)的數(shù)據(jù)庫,它們彼此之間不在同一個(gè)事務(wù)中,假如A執(zhí)行成功了,B執(zhí)行卻失敗了,而A的事務(wù)此時(shí)已經(jīng)提交,無法回滾,那么最終就會(huì)導(dǎo)致兩邊數(shù)據(jù)不一致性的問題;盡管很早之前就有基于兩階段提交的XA分布式事務(wù),但是這類方案因?yàn)樾枰Y源的全局鎖定,導(dǎo)致性能極差;因此后面就逐漸衍生出了消息最終一致性、TCC等柔性事務(wù)的分布式事務(wù)方案,本文主要分析的是基于消息的最終一致性方案。
普通消息的處理流程
首先看一下普通消息的處理流程:
1.消息生成者發(fā)送消息
2.MQ收到消息,將消息進(jìn)行持久化,在存儲(chǔ)中新增一條記錄
3.返回ACK給消費(fèi)者
4.MQ push 消息給對(duì)應(yīng)的消費(fèi)者,然后等待消費(fèi)者返回ACK 5.如果消息消費(fèi)者在指定時(shí)間內(nèi)成功返回ack,那么MQ認(rèn)為消息消費(fèi)成功,在存儲(chǔ)中刪除消息,即執(zhí)行第6步;如果MQ在指定時(shí)間內(nèi)沒有收到ACK,則認(rèn)為消息消費(fèi)失敗,會(huì)嘗試重新push消息,重復(fù)執(zhí)行4、5、6步驟
5.MQ刪除消息
普通消息處理存在的一致性問題
我們以訂單創(chuàng)建為例,訂單系統(tǒng)先創(chuàng)建訂單(本地事務(wù)),再發(fā)送消息給下游處理;如果訂單創(chuàng)建成功,然而消息沒有發(fā)送出去,那么下游所有系統(tǒng)都無法感知到這個(gè)事件,會(huì)出現(xiàn)臟數(shù)據(jù);
如果先發(fā)送訂單消息,再創(chuàng)建訂單;那么就有可能消息發(fā)送成功,但是在訂單創(chuàng)建的時(shí)候卻失敗了,此時(shí)下游系統(tǒng)卻認(rèn)為這個(gè)訂單已經(jīng)創(chuàng)建,也會(huì)出現(xiàn)臟數(shù)據(jù)。
public void processOrder() {// 發(fā)送訂單處理成功消息(發(fā)送消息) sendBizMsg ();// 訂單處理(業(yè)務(wù)操作) orderService.process(); }一個(gè)錯(cuò)誤的想法
此時(shí)可能有同學(xué)會(huì)想,我們可否將消息發(fā)送和業(yè)務(wù)處理放在同一個(gè)本地事務(wù)中來進(jìn)行處理,如果業(yè)務(wù)消息發(fā)送失敗,那么本地事務(wù)就回滾,這樣是不是就能解決消息發(fā)送的一致性問題呢?
消息發(fā)送的異常情況分析
可能的情況一致性訂單處理成功,然后突然宕機(jī),事務(wù)未提交,消息沒有發(fā)送出去一致訂單處理成功,由于網(wǎng)絡(luò)原因或者M(jìn)Q宕機(jī),消息沒有發(fā)送出去,事務(wù)回滾一致訂單處理成功,消息發(fā)送成功,但是MQ由于其他原因,導(dǎo)致消息存儲(chǔ)失敗,事務(wù)回滾一致訂單處理成功,消息存儲(chǔ)成功,但是MQ處理超時(shí),從而ACK確認(rèn)失敗,導(dǎo)致發(fā)送方本地事務(wù)回滾不一致
從上面的情況分析,我們可以看到,使用普通的處理方式,無論如何,都無法保證業(yè)務(wù)處理與消息發(fā)送兩邊的一致性,其根本的原因就在于:遠(yuǎn)程調(diào)用,結(jié)果最終可能為成功、失敗、超時(shí);而對(duì)于超時(shí)的情況,處理方最終的結(jié)果可能是成功,也可能是失敗,調(diào)用方是無法知曉的。 筆者就曾經(jīng)在項(xiàng)目中出現(xiàn)類似的情況,調(diào)用方先在本地寫數(shù)據(jù),然后發(fā)起RPC服務(wù)調(diào)用,但是處理方由于DB數(shù)據(jù)量比較大,導(dǎo)致處理超時(shí),調(diào)用方在出現(xiàn)超時(shí)異常后,直接回滾本地事務(wù),從而導(dǎo)致調(diào)用
方這邊沒數(shù)據(jù),而處理方那邊數(shù)據(jù)卻已經(jīng)寫入了,最終導(dǎo)致兩邊業(yè)務(wù)數(shù)據(jù)的不一致。為了保證兩邊數(shù)據(jù)的一致性,我們只能從其他地方尋找新的突破口。
事務(wù)消息
由于傳統(tǒng)的處理方式無法解決消息生成者本地事務(wù)處理成功與消息發(fā)送成功兩者的一致性問題,因此事務(wù)消息就誕生了,它實(shí)現(xiàn)了消息生成者本地事務(wù)與消息發(fā)送的原子性,保證了消息生成者本地事務(wù)處理成功與消息發(fā)送成功的最終一致性問題。
事務(wù)消息處理的流程
事務(wù)消息與普通消息的區(qū)別就在于消息生產(chǎn)環(huán)節(jié),生產(chǎn)者首先預(yù)發(fā)送一條消息到MQ(這也被稱為發(fā)送half消息)
1.MQ接受到消息后,先進(jìn)行持久化,則存儲(chǔ)中會(huì)新增一條狀態(tài)為發(fā)送的消息然后返回ACK給消息生產(chǎn)者,此時(shí)MQ不會(huì)觸發(fā)消息推送事件
2.生產(chǎn)者預(yù)發(fā)送消息成功后,執(zhí)行本地事務(wù)
執(zhí)行本地事務(wù),執(zhí)行完成后,發(fā)送執(zhí)行結(jié)果給MQ
MQ會(huì)根據(jù)結(jié)果刪除或者更新消息狀態(tài)為可發(fā)送
3.如果消息狀態(tài)更新為可發(fā)送,則MQ會(huì)push消息給消費(fèi)者,后面消息的消費(fèi)和普通消息是一樣的
4.注意點(diǎn):由于MQ通常都會(huì)保證消息能夠投遞成功,因此,如果業(yè)務(wù)沒有及時(shí)返回ACK結(jié)果,那么就有可能造成MQ的重復(fù)消息投遞問題。因此,對(duì)于消息最終一致性的方案,消息的消費(fèi)者必須要對(duì)消息的消費(fèi)支持冪等,不能造成同一條消息的重復(fù)消費(fèi)的情況。
事務(wù)消息異常情況分析
異常情況一致性處理異常方法消息未存儲(chǔ),業(yè)務(wù)操作未執(zhí)行一致無存儲(chǔ)待發(fā)送消息成功,但是ACK失敗,導(dǎo)致業(yè)務(wù)未執(zhí)行(可能是MQ處理超時(shí)、網(wǎng)絡(luò)抖動(dòng)等原因)不一致MQ確認(rèn)業(yè)務(wù)操作結(jié)果,處理消息(刪除消息)存儲(chǔ)待發(fā)送消息成功,ACK成功,業(yè)務(wù)執(zhí)行(可能成功也可能失敗),但是MQ沒有收到生產(chǎn)者業(yè)務(wù)處理的最終結(jié)果不一致MQ確認(rèn)業(yè)務(wù)操作結(jié)果,處理消息(根據(jù)就業(yè)務(wù)處理結(jié)果,更新消息狀態(tài),如果業(yè)務(wù)執(zhí)行成功,則投遞消息,失敗則刪除消息)業(yè)務(wù)處理成功,并且發(fā)送結(jié)果給MQ,但是MQ更新消息失敗,導(dǎo)致消息狀態(tài)依舊為待發(fā)送不一致同上
支持事務(wù)消息的MQ
現(xiàn)在目前較為主流的MQ,比如ActiveMQ、RabbitMQ、Kafka、RocketMQ等,只有RocketMQ支持事務(wù)消息。據(jù)了解,早年阿里對(duì)MQ增加事務(wù)消息也是因?yàn)橹Ц秾毮沁呉驗(yàn)闃I(yè)務(wù)上的需求而產(chǎn)生的。因此,如果我們希望強(qiáng)依賴一個(gè)MQ的事務(wù)消息來做到消息最終一致性的話,在目前的情況下,技術(shù)選型上只能去選擇RocketMQ來解決。上面我們也分析了事務(wù)消息所存在的異常情況,即MQ存儲(chǔ)了待發(fā)送的消息,但是MQ無法感知到上游處理的最終結(jié)果。對(duì)于RocketMQ而言,它的解決方案非常的簡(jiǎn)單,就是其內(nèi)部實(shí)現(xiàn)會(huì)有一個(gè)定時(shí)任務(wù),去輪訓(xùn)狀態(tài)為待發(fā)送的消息,然后給producer發(fā)送check請(qǐng)求,而producer必須實(shí)現(xiàn)一個(gè)check監(jiān)聽器,監(jiān)聽器的內(nèi)容通常就是去檢查與之對(duì)應(yīng)的本地事務(wù)是否成功(一般就是查詢DB),如果成功了,則MQ會(huì)將消息設(shè)置為可發(fā)送,否則就刪除消息。
常見的問題
問:如果預(yù)發(fā)送消息失敗,是不是業(yè)務(wù)就不執(zhí)行了?
答:是的,對(duì)于基于消息最終一致性的方案,一般都會(huì)強(qiáng)依賴這步,如果這個(gè)步驟無法得到保證,那么最終也 就不可能做到最終一致性了。
問:為什么要增加一個(gè)消息預(yù)發(fā)送機(jī)制,增加兩次發(fā)布出去消息的重試機(jī)制,為什么不在業(yè)務(wù)成功之后,發(fā)送失敗的話使用一次重試機(jī)制?
答:如果業(yè)務(wù)執(zhí)行成功,再去發(fā)消息,此時(shí)如果還沒來得及發(fā)消息,業(yè)務(wù)系統(tǒng)就已經(jīng)宕機(jī)了,系統(tǒng)重啟后,根本沒有記錄之前是否發(fā)送過消息,這樣就會(huì)導(dǎo)致業(yè)務(wù)執(zhí)行成功,消息最終沒發(fā)出去的情況。
如果consumer消費(fèi)失敗,是否需要producer做回滾呢?
答:這里的事務(wù)消息,producer不會(huì)因?yàn)閏onsumer消費(fèi)失敗而做回滾,采用事務(wù)消息的應(yīng)用,其所追求的是高可用和最終一致性,消息消費(fèi)失敗的話,MQ自己會(huì)負(fù)責(zé)重推消息,直到消費(fèi)成功。因此,事務(wù)消息是針對(duì)生產(chǎn)端而言的,而消費(fèi)端,消費(fèi)端的一致性是通過MQ的重試機(jī)制來完成的。
如果consumer端因?yàn)闃I(yè)務(wù)異常而導(dǎo)致回滾,那么豈不是兩邊最終無法保證一致性?
答:基于消息的最終一致性方案必須保證消費(fèi)端在業(yè)務(wù)上的操作沒障礙,它只允許系統(tǒng)異常的失敗,不允許業(yè)務(wù)上的失敗,比如在你業(yè)務(wù)上拋出個(gè)NPE之類的問題,導(dǎo)致你消費(fèi)端執(zhí)行事務(wù)失敗,那就很難做到一致了。
由于并非所有的MQ都支持事務(wù)消息,假如我們不選擇RocketMQ來作為系統(tǒng)的MQ,是否能夠做到消息的最終一致性呢?答案是可以的。
基于本地消息的最終一致性
基于本地消息的最終一致性方案的最核心做法就是在執(zhí)行業(yè)務(wù)操作的時(shí)候,記錄一條消息數(shù)據(jù)到DB,并且消息數(shù)據(jù)的記錄與業(yè)務(wù)數(shù)據(jù)的記錄必須在同一個(gè)事務(wù)內(nèi)完成,這是該方案的前提核心保障。在記錄完成后消息數(shù)據(jù)后,后面我們就可以通過一個(gè)定時(shí)任務(wù)到DB中去輪訓(xùn)狀態(tài)為待發(fā)送的消息,然后將消息投遞給MQ。這個(gè)過程中可能存在消息投遞失敗的可能,此時(shí)就依靠重試機(jī)制來保證,直到成功收到MQ的ACK確認(rèn)之后,再將消息狀態(tài)更新或者消息清除;而后面消息的消費(fèi)失敗的話,則依賴MQ本身的重試來完成,其最后做到兩邊系統(tǒng)數(shù)據(jù)的最終一致性。基于本地消息服務(wù)的方案雖然可以做到消息的最終一致性,但是它有一個(gè)比較嚴(yán)重的弊端,每個(gè)業(yè)務(wù)系統(tǒng)在使用該方案時(shí),都需要在對(duì)應(yīng)的業(yè)務(wù)庫創(chuàng)建一張消息表來存儲(chǔ)消息。針對(duì)這個(gè)問題,我們可以將該功能單獨(dú)提取出來,做成一個(gè)消息服務(wù)來統(tǒng)一處理,因而就衍生出了我們下面將要討論的方案。
獨(dú)立消息服務(wù)的最終一致性
獨(dú)立消息服務(wù)最終一致性與本地消息服務(wù)最終一致性最大的差異就在于將消息的存儲(chǔ)單獨(dú)地做成了一個(gè)RPC的服務(wù),這個(gè)過程其實(shí)就是模擬了事務(wù)消息的消息預(yù)發(fā)送過程,如果預(yù)發(fā)送消息失敗,那么生產(chǎn)者業(yè)務(wù)就不會(huì)去執(zhí)行,因此對(duì)于生產(chǎn)者的業(yè)務(wù)而言,它是強(qiáng)依賴于該消息服務(wù)的。不過好在獨(dú)立消息服務(wù)支持水平擴(kuò)容,因此只要部署多臺(tái),做成HA的集群模式,就能夠保證其可靠性。在消息服務(wù)中,還有一個(gè)單獨(dú)地定時(shí)任務(wù),它會(huì)定期輪訓(xùn)長(zhǎng)時(shí)間處于待發(fā)送狀態(tài)的消息,通過一個(gè)check補(bǔ)償機(jī)制來確認(rèn)該消息對(duì)應(yīng)的業(yè)務(wù)是否成功,如果對(duì)應(yīng)的業(yè)務(wù)處理成功,則將消息修改為可發(fā)送,然后將其投遞給MQ;如果業(yè)務(wù)處理失敗,則將對(duì)應(yīng)的消息更新或者刪除即可。因此在使用該方案時(shí),消息生產(chǎn)者必須同時(shí)實(shí)現(xiàn)一個(gè)check服務(wù),來供消息服務(wù)做消息的確認(rèn)。對(duì)于消息的消費(fèi),該方案與上面的處理是一樣,都是通過MQ自身的重發(fā)機(jī)制來保證消息被消費(fèi)。
總結(jié)
以上是生活随笔為你收集整理的分布式事务——消息最终一致性方案的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度讲解:同步/异步/阻塞/非阻塞/BI
- 下一篇: 为什么用redis?