ENode 2.0 - 深入分析ENode的内部实现流程和关键地方的幂等设计
前言
ENode是一個基于消息的架構(gòu),使用ENode開發(fā)的系統(tǒng),每個環(huán)節(jié)都是處理消息,處理完后產(chǎn)生新的消息。本篇文章我想詳細分析一下ENode框架內(nèi)部是如何實現(xiàn)整個消息處理流程的。為了更好的理解我后面的流程的描述,我覺得還是應該先把ENode的架構(gòu)圖貼出來,好讓大家在看后面的分析時,可以對照這個架構(gòu)圖進行思考和理解。
ENode架構(gòu)圖
ENode框架內(nèi)部實現(xiàn)流程分析
Command的冪等處理
上面流程中的第8步,Command會被添加到ICommandStore。這里,實際上我添加到ICommandStore的是一個HandleCommand對象,該對象包含當前的Command之外,還有當前被修改的聚合根ID。這樣做的理由請看我后面的解釋。我們知道ICommandStore會對CommandId作為主鍵,這樣我們就能絕對保證一個Command不會被重復添加。如果Command添加到ICommandStore成功,那自然最好了,直接進入后續(xù)的步驟即可;但是如果出現(xiàn)CommandId重復的時候,我們需要做怎么樣的處理呢?
如果出現(xiàn)重復,則需要根據(jù)CommandId(主鍵),把之前已經(jīng)持久化過的HandledCommand取出來;然后然后我們從HandledCommand拿到被修改的聚合根ID,然后最關(guān)鍵的一步:我們將該聚合根ID以及CommandId作為條件從IEventStore中查詢出一個可能存在的EventStream。如果存在,就說明這個Command所產(chǎn)生的Domain Event已經(jīng)被持久化了,所以我們只要再做一遍發(fā)布事件的操作即可。即調(diào)用IEventPublisher.Publish方法來發(fā)布事件到Query Side。那么為什么要發(fā)布呢?因為雖然事件被持久化了,但并不代表已經(jīng)成功被發(fā)布出去了。因為理論上有可能Domain Event被持久化成功了,但是在要發(fā)布事件的時候,斷電了!所以這種情況下,重啟服務器就會出現(xiàn)這里討論的情況了。所以我們需要再次Publish事件。
然后,如果沒有根據(jù)CommandId和聚合根ID查找到EventStream呢?那也好辦,因為這種情況就說明這個Command雖然被持久化了,但是它所產(chǎn)生的EventStream卻沒有被持久化到EventStore,所以我們需要將當前的EventStream調(diào)用IEventService.Commit方法進行持久化事件。
另外,這里其實有一個疑問,為什么查找EventStream不能僅僅根據(jù)CommandId呢?原因是:從技術(shù)上來說,我們可以只根據(jù)CommandId來查找到一個唯一的EventStream,但這樣設計的話,就要求EventStore必須要支持通過一個CommandId來全局唯一定位到一個EventStream了。但是因為考慮到EventStore的數(shù)據(jù)量是非常大的,我們以后可能會根據(jù)聚合根ID做水平拆分(sharding)。這樣的話,我們僅僅靠CommandId就無法知道到哪個分片下去查找對應的EventStream了。所以,如果查詢時,能同時指定聚合根ID,那我們就能輕松知道首先到哪個分片下去找EventStream,然后再根據(jù)CommandId就能輕松定位到一個唯一的EventStream了。
既然說到這里,我再說一下CommandStore的水平分割的設計吧,CommandStore的數(shù)據(jù)量也是非常大的,因為它會存儲所有的Command。不過幸好,我們對于CommandStore只需要根據(jù)CommandId去查找即可,所以我們可以根據(jù)CommandId來做Hash取模的方式來水平拆分。這樣即便是分片了,我們只要知道了一個給定的CommandId,也能知道它當前是在哪個分片下的,就很容易找到該Command了。
所以,通過上面的分析,我們知道了CommandStore和EventStore在設計上不僅僅考慮了如何存儲數(shù)據(jù),還考慮了未來大數(shù)據(jù)量時如何分片,以及如何在分片的情況下仍然能方便的查找到我們的數(shù)據(jù)。
最后,上面還有一種情況沒有說明,就是當出現(xiàn)Command添加到CommandStore時發(fā)現(xiàn)重復,但是嘗試從CommandStore中根據(jù)CommandId查詢該Command時,發(fā)現(xiàn)查不到,天哪!這種情況實際上不應該出現(xiàn),如果出現(xiàn),那說明CommandStore內(nèi)部有問題了。因為為什么添加時說有重復,而查詢卻差不多來呢?呵呵。這種情況就無法處理了,我們只能記錄錯誤日志,然后進行后續(xù)的排查。
Domain Event持久化時的并發(fā)沖突檢測和處理
上面流程中的第10步,我們提到:如果遇到EventStream持久化到IEventStore時遇到版本號重復(同一個聚合根ID+聚合根的Version相同,則認為有并發(fā)沖突),此時框架需要做不同的邏輯處理。具體是:
首先,我們可以先想想為什么會出現(xiàn)同一個聚合根會在幾乎同一時刻產(chǎn)生兩個版本號一樣的領(lǐng)域事件,并持久化到EventStore。首先,我先說一下這種情況幾乎不會出現(xiàn)的理由:ENode中,在ICommandExecutor在處理一個Command時,會檢查當前該Command所要修改的聚合根是否已經(jīng)有至少一個聚合根正在被處理,如果有,則會將當前Command排入到這個聚合根所對應的等候隊列。也就是說,它暫時不會被執(zhí)行。然后當當前聚合根的前面的Command被執(zhí)行完了后才會從這個等候隊列取出下一個等待的Command進行處理。通過這樣的設計,我們保證了,對一個聚合根的所有Command,不會并行被執(zhí)行,只會按照順序被執(zhí)行。因為每個ICommandExecutor會在需要的時候,為某個聚合根自動創(chuàng)建這種等候隊列,只要對該聚合根的Command同一時刻進來2個或以上。
那么,要是集群的時候呢?你一臺機器的話,通過上面的方式可以確保一個聚合根實例的所有的Command會被順序處理。但是集群的時候,可能一個聚合根會在多臺機器被同時處理了。要解決這個問題的思路就是對Command按照聚合根ID進行路由了,因為一般只要是修改聚合根的Command總是會帶有一個聚合根ID,所以我們可以按照這個特性,對被發(fā)送的Command按照聚合根ID進行路由。只要CommandId相同,則總是會被路由到同一個隊列,然后因為一個隊列總是只會被一臺機器消費,從而我們能保證對同一個聚合根的Command總是會落到一臺機器上被處理。那么你可能會說,要是熱點數(shù)據(jù)呢?比如有些聚合根突然對他修改的Command可能非常多(增加了一倍),而有些則很少,那怎么辦呢?沒關(guān)系,我們還有消息隊列的監(jiān)控平臺。當出現(xiàn)某個聚合根的Command突然非常多的時候,我們可以借助于EQueue的Topic的Queue可以隨時進行增加的特性來應付這個問題。比如原來這個Topic下只有4個Queue,那現(xiàn)在增加到8個,然后消費者機器也從4臺增加到8臺。這樣相當于Command的處理能力也增加了一倍。從而可以方便的解決熱點數(shù)據(jù)問題。因此,這也是我想要自己實現(xiàn)分布式消息隊列EQueue的原因啊!有些場景,要是自己沒有辦法完全掌控,會很被動,直接導致整個架構(gòu)的嚴重缺陷,最后導致系統(tǒng)癱瘓,而自己卻無能為了。當然你可以說我們可以使用Kafka, Rocketmq這樣的高性能分布式隊列,確實。但是畢竟這種高大上的隊列非常復雜,且都是非.NET平臺。除了問題,維護起來肯定比自己開發(fā)的要難維護。當然除非你對它們非常精通且有自信的運維能力。
通過上面的思路實現(xiàn)的,確保聚合根的Command總是被順序線性處理的設計,對EventStore有非常大的意義。因為這樣可以讓EventStore不會出現(xiàn)并發(fā)沖突,從而不會造成無謂的對EventStore的訪問,也可以極大的降低EventStore的壓力。
但是什么時候還是可能會出現(xiàn)并發(fā)沖突呢?因為:
1)當處理Command的某臺機器掛了,然后這臺機器所消費的Queue里的消息就會被其他機器接著消費。其他機器可能會從這個Queue里批量拉取一些Command消息來消費。然后此時假如我們重啟了這臺有問題的服務器,重啟完之后,因為又會開始消費這個Queue。然后一個關(guān)鍵的點是,每次一臺機器啟動時,會從EQueue的Broker拉取這個Queue最后一個被消費的消息的位置,也就是Offset,而由于這個Offset的更新是異步的,比如5s才會更新到EQueue的Broker,所以導致這臺重啟后的服務器從Broker上拉取到的消費位置其實是有延遲的,從而就可能會去消費在那臺之前接替你的服務器已經(jīng)消費過的或者正在消費的Command消息了。當然這種情況因為條件太苛刻,所以基本不會發(fā)生,即便會發(fā)生,一般也不會導致Command的并發(fā)執(zhí)行。但是這畢竟也是一種可能性。實際上這里不僅僅是某個服務器掛掉后再重啟的情況會導致并發(fā)沖突,只要是處理Comand的機器的集群中有任何的機器的增加或減少,由于都會導致Command消息的消費者集群重新負載均衡。在這個負載均衡的過程中,就會導致同一個Topic下的同一個Queue里的部分消息可能會在兩臺服務器上被消費。原因是Queue的消費位置(offset)的更新不是實時的,而是定時的。所以,我們一般建議,盡量不要在消息很多的時候做消費者集群內(nèi)機器的變動,而是盡量在沒什么消息的時候,比如凌晨4點時,做集群的擴容操作。這樣可以盡量避免所有可能帶來的消息重復消費或者并發(fā)沖突的可能性。呵呵,這段話也許很多人看的云里霧里,我只能說到這個程度了,也許要完全理解,大家還需要對EQueue的設計很清楚才行!
2)就算同一個機器內(nèi),其實也是有可能出現(xiàn)對同一個聚合根的并發(fā)修改,也就是針對同一個聚合根的兩個Command被同時執(zhí)行。原因是:當一個Command所對應的EventStream在被持久化時出現(xiàn)重復,然后我就會放在一個本地的內(nèi)存隊列進行重試,然后重試由于是在另一個專門的重試線程里,該線程不是正常處理Command的線程。所以假如對該聚合根后續(xù)還有Command要被處理,那就有可能會出現(xiàn)同一時刻,一個聚合根被兩個Command修改的情況了。
現(xiàn)在,我們在回來討論,假如遇到?jīng)_突時,要怎么做?這個上面我簡單提到過,就是需要重試Command。但也不是這么簡單的邏輯。我們需要:
a. 先檢查當前的EventStream的Version是否為1,假如為1,說明有一個創(chuàng)建聚合根的Command被并發(fā)執(zhí)行了。此時我們無須在重試了,因為即便再重試,那最后產(chǎn)生的EventStream的版本號也總是1,因為只要是第一次創(chuàng)建聚合根,那這個聚合根所產(chǎn)生的DomainEvent的版本總是1。所以這種情況下,我們只需要直接從EventStore拿出這個已經(jīng)存在的EventStream,然后通過IEventPublisher.Publish方法發(fā)布該EventStream即可。為什么要再次發(fā)布,上面解釋Command的冪等時,也解釋了原因,這里是一樣的原因。這里也有一個小的點需要注意,就是假如嘗試從EventStore拿出這個EventStream時,假如沒獲取到呢?這個問題實際上不應該出現(xiàn),原因就像上面分析Command冪等時一樣,為什么會出現(xiàn)添加時提示存在,但查詢時卻查不到的情況呢?這種情況就是EventStore的設計有問題了,讀寫存在非強一致性的情況了。
b. 如果當前的EventStream的Version大于1,則我們需要先更新內(nèi)存緩存(Redis),然后做Command的重試處理。為什么要先更新緩存呢?因為如果不更新,有可能重試時,拿到的聚合根的狀態(tài)還是舊的,所以重試后還是導致版本號沖突。那為什么從緩存中拿到的聚合根的狀態(tài)可能還是舊的呢?因為EventStream已經(jīng)存在于EventStore并不代表這個EventStream的修改已經(jīng)更新到緩存了。因為我們是先持久化到EventStore,在更新緩存的。完全有可能你還沒來得及更新緩存的時候,另一個Command正好需要重試呢!所以,最保險的做法,就是再重試的時候?qū)⒕彺嬷械木酆细鶢顟B(tài)更新到最新值。那怎么更新呢?呵呵,很簡單,就是通過事件溯源(即Event Sourcing技術(shù))了。我們只要從Event Store獲取當前聚合根的所有的Event Stream,然后溯源這些事件,最后就能得到聚合根的最新版本的狀態(tài)了,然后更新到緩存即可。
最后,如果需要重試的話,要怎么重試呢?很簡單,只要扔到一個本地的基于內(nèi)存的重試隊列即可。我現(xiàn)在是用BlockingCollection的。
如何保證事件產(chǎn)生的順序和被消費的順序相同
為什么要保證這個相同的順序,在上面的流程步驟介紹里已經(jīng)說明了。這里我們分析一下如何實現(xiàn)這個順序的一致。基本的思路是用一個表,存放所有聚合根當前已經(jīng)處理過的最大版本號,假如當前已經(jīng)處理過的最大版本號是10,那接下來只能處理這個聚合根版本號為11的EventStream。即便Version=12或者更后面的先過來,也只能等著。那怎么等呢?也是類似Command的重試隊列一樣,在一個本地的內(nèi)存隊列等就行了。比如現(xiàn)在最大已處理的版本號是10,然后現(xiàn)在12,13這兩個版本號的EventStream先過來,那就先到隊列等著,然后版本號是11的這個事件過來了,就可以處理。處理好之后,當前最大已處理的版本號就編程11了,所以等候隊列中的版本號為12的EventStream就可以允許被處理了。整個控制邏輯就是這樣。那么這是單機的算法,要是集群呢?實際上這不必考慮集群的情況,因為我們每臺機器上都是這個順序控制邏輯,所以如果是集群,那最多可能出現(xiàn)的情況(實際上這種情況存在的可能性也是非常的低)是,版本號為11的EventStream被并發(fā)的處理。這種情況就是我下面要分析的。
這里實際上還有一個細節(jié)我還沒說到,這個細節(jié)和EQueue的Consumer的ConsumerGroup相關(guān),就是假如一種消息,有很多Consumer消費,然后這些Consumer假如分為兩個ConsumerGroup,那這兩個ConsumerGroup的消費是相互隔離的。也就是說,所有這些消息,兩個ConsumerGroup內(nèi)的Consumer都會消費到。這里如果不做一些其他的設計,可能會在用戶使用時遇到潛在的問題。這里我沒辦法說的很清楚,說的太清楚估計會讓大家思維更混亂,且因為這個點不是重點。所以就不展開了。有興趣的朋友可以看一下ENode中的EventPublishInfo表中的EventProcessorName字段的用意。
如何保證一個IDomainEvent只會被一個IEventHandler處理一次
這一條的原因,我想大家都能理解。比如一個Event Handler是更新讀庫的,可能我們會執(zhí)行一條有副作用的SQL,比如update product set price = price + 1 where id = 1000。這條SQL如果被重復執(zhí)行一次,那price字段的值就多了1了,這不是我們所期望的結(jié)果。所以框架需要有基本的責任可以基本避免這種情況的發(fā)生。那怎么實現(xiàn)呢?思路也是用一張表,記錄被執(zhí)行的DomainEvent的ID以及當前處理這個DomainEvent的Event Handler的類型的一個Code,對這兩個聯(lián)合字段做聯(lián)合唯一索引。每次當一個Event Handler要處理一個Domain Event時,先判斷是否已經(jīng)處理過,如果沒處理過,則處理,處理過之后把被處理的Domain Event ID和EventHandler Type Code添加到這個表里即可。那假如添加的時候失敗了呢?因為有可能也會有并發(fā)的情況,導致Event Handler重復處理同一個Domain Event。這種情況框架就不做嚴謹?shù)奶幚砹?#xff0c;因為框架本身也無法做到。因為框架式無法知道Event Handler里面到底在做什么的。有可能是發(fā)送郵件,也有可能是記錄日志,也可能是更新讀取(Read DB)。所以,最根本的,還是要求Event Handler內(nèi)部,也就是開發(fā)這自己需要考慮冪等的實現(xiàn)。當然框架會提供給開發(fā)者必要的信息來幫助他們完成嚴謹冪等控制。比如框架會提供當前Domain Event 的版本號給Event Handler,這樣Event Handler里就能在Update SQL的Where部分把這個Version帶上,從而實現(xiàn)樂觀并發(fā)控制。比如下面的代碼示例:
public void Handle(IEventContext context, SectionNameChangedEvent evnt) {TryUpdateRecord(connection =>{return connection.Update(new{Name = evnt.Name,UpdatedOn = evnt.Timestamp,Version = evnt.Version},new{Id = evnt.AggregateRootId,Version = evnt.Version - 1}, Constants.SectionTable);}); }上面的代碼中,當我們更新一個論壇的版塊時,我們可以在sql的where條件中,用version = evnt.Verion - 1這樣的條件。從而確保當前你要處理的事件一定是上一次已處理的事件的版本號的下一個版本號,也就是保證了Query Side的更新的順序嚴格和事件產(chǎn)生的順序一致。這樣即便框架在有漏網(wǎng)之魚的時候,Event Handler內(nèi)部也能做嚴謹?shù)捻樞蚩刂啤.斎蝗绻愕腅vent Handler是發(fā)送郵件,那我還真不知道該如何進一步保證這個嚴謹?shù)捻樞蚧蛘卟l(fā)沖突了,呵呵。有興趣的朋友可以和我交流。
在Saga Process Manager中產(chǎn)生的ICommand如何能夠支持重試發(fā)送而不導致操作的重復
終于到最后一點了,好累。堅持就是勝利!假如現(xiàn)在的Saga Event Handler里是會產(chǎn)生Command,那框架在發(fā)送這些Command時,要確保不能重復執(zhí)行。怎么辦呢?假如在Saga Event Handler里產(chǎn)生的Command的Id每次都是新new出來的一個唯一的值,那框架就無法知道這個Command是否和之前的重復了,框架會認為這是兩個不同的Command。這里其實有兩種做法:
1. 框架先把Saga Event Handler中產(chǎn)生的Command保存起來,然后慢慢發(fā)送到EQueue。發(fā)送成功一個,就刪除一個。直到全部發(fā)送完為止。這種做法是可行的,因為這樣一來,我們要發(fā)送的Command就總是從存儲這些Command的地方去拿了,所以不會出現(xiàn)每次要發(fā)送的同一個Command的ID都是不同的情況。但是這種設計性能不是太好,因為要發(fā)送的Command必須要先被保存起來,然后再發(fā)送,發(fā)送完之后還要刪除。性能上肯定不會太高。
2.第二種做法是,Command不存儲起來,而是直接把Saga Event Handler中產(chǎn)生的Command拿去發(fā)送。但這種設計要求:框架對這種要發(fā)送的Command的ID總是按照某個特定的規(guī)則來產(chǎn)生的。這種規(guī)則要保證產(chǎn)生的CommandId首先是要唯一的,其次是確定的。下面我們看一下下面的代碼:
private string BuildCommandId(ICommand command, IDomainEvent evnt, int eventHandlerTypeCode) {var key = command.GetKey();var commandKey = key == null ? string.Empty : key.ToString();var commandTypeCode = _commandTypeCodeProvider.GetTypeCode(command.GetType());return string.Format("{0}{1}{2}{3}", evnt.Id, commandKey, eventHandlerTypeCode, commandTypeCode); }上面這個代碼是一個函數(shù),用來構(gòu)建要被發(fā)送的Command的Id的,我們可以看到ID是由Command的一個key+要被發(fā)送的Command的類型的code+當前被處理的Domain Event的ID,以及當前的Saga Event Handler的類型的code這四個信息組成。對于同一個Domain Event被同一個Event Handler處理,然后如果產(chǎn)生的Command的類型也是一樣的,那我們基本可以通過這三個信息構(gòu)建一個唯一的CommandId了,但是有時這樣還不夠,因為我們可能在一個Event Handler里構(gòu)建兩個類型完全一樣的Command,但是他們修改的聚合根的ID不同。所以,我上面才有一個commandKey的組成部分。這個key默認就是這個Command要修改的聚合根的ID。這樣,通過這樣4個信息的組合,我們可以確保不管某個Domain Event被某個Saga Event Handler處理多少次,最后產(chǎn)生的Command的ID總是確定的,不變的。當然上面的commandKey有時僅僅考慮聚合根ID可能還不夠,雖然我還沒遇到過這種情況,呵呵。所以我框架設計上,就允許開發(fā)者能重新GetKey方法,開發(fā)者需要理解何時需要重寫這個方法。看了這里的說明應該就知道了!
好了,差不多了,該睡覺了!
總結(jié)
以上是生活随笔為你收集整理的ENode 2.0 - 深入分析ENode的内部实现流程和关键地方的幂等设计的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php 图片上传预览(转)
- 下一篇: 打开Delphi 10.1 berlin