通过Dapr实现一个简单的基于.net的微服务电商系统(十九)——分布式事务之Saga模式...
目錄:
一、通過Dapr實現一個簡單的基于.net的微服務電商系統
二、通過Dapr實現一個簡單的基于.net的微服務電商系統(二)——通訊框架講解
三、通過Dapr實現一個簡單的基于.net的微服務電商系統(三)——一步一步教你如何擼Dapr
四、通過Dapr實現一個簡單的基于.net的微服務電商系統(四)——一步一步教你如何擼Dapr之訂閱發布
通過Dapr實現一個簡單的基于.net的微服務電商系統(五)——一步一步教你如何擼Dapr之狀態管理
通過Dapr實現一個簡單的基于.net的微服務電商系統(六)——一步一步教你如何擼Dapr之Actor服務
通過Dapr實現一個簡單的基于.net的微服務電商系統(七)——一步一步教你如何擼Dapr之服務限流
通過Dapr實現一個簡單的基于.net的微服務電商系統(八)——一步一步教你如何擼Dapr之鏈路追蹤
通過Dapr實現一個簡單的基于.net的微服務電商系統(九)——一步一步教你如何擼Dapr之OAuth2授權
通過Dapr實現一個簡單的基于.net的微服務電商系統(九)——一步一步教你如何擼Dapr之OAuth2授權-百度版
通過Dapr實現一個簡單的基于.net的微服務電商系統(十)——一步一步教你如何擼Dapr之綁定
通過Dapr實現一個簡單的基于.net的微服務電商系統(十一)——一步一步教你如何擼Dapr之自動擴/縮容
通過Dapr實現一個簡單的基于.net的微服務電商系統(十二)——istio+dapr構建多運行時服務網格
通過Dapr實現一個簡單的基于.net的微服務電商系統(十三)——istio+dapr構建多運行時服務網格之生產環境部署
通過Dapr實現一個簡單的基于.net的微服務電商系統(十四)——開發環境容器調試小技巧
通過Dapr實現一個簡單的基于.net的微服務電商系統(十五)——集中式接口文檔實現
通過Dapr實現一個簡單的基于.net的微服務電商系統(十六)——dapr+sentinel中間件實現服務保護
通過Dapr實現一個簡單的基于.net的微服務電商系統(十七)——服務保護之動態配置與熱重載
通過Dapr實現一個簡單的基于.net的微服務電商系統(十八)——服務保護之多級緩存
附錄:(如果你覺得對你有用,請給個star)
一、電商Demo地址:https://github.com/sd797994/Oxygen-Dapr.EshopSample
二、通訊框架地址:https://github.com/sd797994/Oxygen-Dapr
三、Saga框架地址:https://github.com/sd797994/Oxygen-Saga
一、什么是Saga
? ? ? ? 在領域驅動設計中,由于領域邊界的存在,以往的分層設計中業務會按照其固有的領域知識被切分到不同的限界中,并且引入了領域事件這一概念來降低單個業務的復雜度,通過非耦合的事件驅動來完成復雜的業務。但是事件驅動帶來了一些新的問題,由于以往一個原子性極強的邏輯被拆散到了一個一個小的領域中,原子性事務數據的強一致性無法被保證。為了解決這個問題,一般會采用事務補償的方式來確保最終一致。
? ? ? ? 當我們采用多個本地事務組合去進行業務處理時,由于業務其本身的復雜性,往往需要在多個事務中協調。而事件協調器(saga)就是一個專門降低其復雜度的設計,開發人員原則上只需要將事件和補償按照一定順序注冊到協調處理器中,原則上協調器會按照注冊的事件依次執行,若出現事件執行失敗時,也會按照補償列表進行相應的回滾。
? ? ? ? 去年曾經在一篇文章里聊過Saga,鏈接在此。相比去年的那個小DEMO。我在此基礎上基于Dapr的狀態管理完成了另外一個開源項目Oxygen-Saga,地址:https://github.com/sd797994/Oxygen-Saga。當然這個項目既可以引入到dapr的環境中完成Saga事務,本身也可以獨立的基于rabbitmq+redis來完成非Dapr環境下的Saga事務。今天主要講解一下如何在Dapr環境下引入Saga來實現一個分布式事務。
首先我們還是看看目前的一個訂單下單邏輯,可以看到基于Actor服務,我們的請求是在訂單服務里直連商品服務Actor完成庫存減扣的,并通過Actor的周期性持久化機制完成事務落庫,所以實際上是把分布式事務的復雜性通過Actor屏蔽掉了,并沒有涉及到真正的分布式事務。
從代碼層面也可以看到,整個事務其實是在同一個方法里以同步調用Actor的方式完成的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | #region 私有遠程服務包裝器方法 async Task<List<OrderGoodsSnapshot>> GetGoodsListByIds(IEnumerable<Guid> input) { ????return?(await goodsQueryService.GetGoodsListByIds(new?GetGoodsListByIdsDto(input))).GetData<List<OrderGoodsSnapshot>>(); } async Task<bool> DeductionGoodsStock(CreateOrderDeductionGoodsStockDto input) { ????var?data = input.CopyTo<CreateOrderDeductionGoodsStockDto, DeductionStockDto>(); ????data.ActorId = input.GoodsId.ToString(); ????return?(await goodsActorService.DeductionGoodsStock(data)).GetData<bool>(); } async Task<bool> UnDeductionGoodsStock(CreateOrderDeductionGoodsStockDto input) { ????var?data = input.CopyTo<CreateOrderDeductionGoodsStockDto, DeductionStockDto>(); ????data.ActorId = input.GoodsId.ToString(); ????return?(await goodsActorService.UnDeductionGoodsStock(data)).GetData<bool>(); } #endregion |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | public?async Task<ApiResult> CreateOrder(OrderCreateDto input) { ????var?mockUser = (await accountQueryService.GetMockAccount()).GetData<CurrentUser>(); ????//申明一個創建訂單領域服務實例,將遠程rpc調用作為匿名函數傳遞進去 ????var?createOrderService =?new?CreateOrderService(GetGoodsListByIds, DeductionGoodsStock, UnDeductionGoodsStock); ????return?await ApiResult.Ok("訂單創建成功!").RunAsync(async () => ????{ ????????var?order = await createOrderService.CreateOrder(mockUser.Id, mockUser.UserName, mockUser.Address, mockUser.Tel, input.Items.CopyTo<OrderCreateDto.OrderCreateItemDto, OrderItem>().ToList());//通過訂單服務創建訂單 ????????repository.Add(order); ????????if?(await?new?CheckOrderCanCreateSpecification(repository).IsSatisfiedBy(order)) ????????????await unitofWork.CommitAsync(); ????????await eventBus.SendEvent(EventTopicDictionary.Order.CreateOrderSucc,?new?OperateOrderSuccessEvent(order, mockUser.UserName));//發送訂單創建成功事件 ????}, ????//失敗回滾 ????createOrderService.UnCreateOrder); } |
而在Saga方案里就變得不一樣了,我們需要借助Saga的流程管理器在多個實例中流轉我們的事務,通過事件訂閱和發布來最終完成一個分布式事務。具體的調用流程如下:
可以看到整個流程完全依賴于SagaManger來提供對應的調用策略,而作為開發人員只需要為每個策略提供對應的委托函數,對應的具體流程如下:
一、首先是訂單服務和商品服務在系統初始化時需要注冊對應的配置文件到本地Saga管理器,并且需要提供對應的流程處理委托(包括業務事件委托+補償事件委托+異常處理委托)。
二、在客戶發起一個訂單時,只需要創建一個Saga流程實例,剩下的就交給Saga,它會自動幫我們流轉整個業務邏輯而無需人工插手。
二、talk is cheap, show me the code
首先我們需要為整個訂單創建流程設計一組Topic:
接著我們創建一個Saga配置注冊這組topic進去,流程比較簡單,第一步是扣除庫存,下一步是創建訂單,補償事件只有一個就是庫存回滾:
再然后我們需要創建這些Topic對應的事件處理器:
訂單服務:
商品服務:?
接著我們在各自的服務里去實現它們:
商品服務:
訂單服務:
然后我們在訂單用例里創建一個Action用于啟動saga流程(注意是第二個方法):
?最后我們在商品和訂單服務中引入saga組件并注冊事件和異常回調委托(注冊代碼相似,此處僅展示其中一個服務的):
接著我們修改m站的創建訂單接口,修改為新的Saga流程接口,然后編譯整個項目,啟動并測試一下:
?
?可以看到整個流程被順利的通過Saga管理器流轉完畢。接著我們嘗試注入一個故障,看看能否正常的被異常訂閱器捕獲到:
?編譯后再測試一下,我們可以看到事件異常訂閱器里已經成功捕獲到了這個異常:
?接著我們在創建訂單注入一個異常,讓商品庫存進行回滾操作,這里由于下單后異常商品補償所以在界面上是體現不出來的(當然在真實的業務場景中一般是下單5秒等待后查詢訂單創建情況,或發送一條站內信告知下單失敗),這里我們通過打印控制臺來演示:
接著我們運行并下單,追蹤商品服務的日志:
可以看到由于訂單創建失敗,saga觸發了補償事件并成功執行了補償。好了,關于saga的演示就到這里,可嘗試拉取最新的商城源碼執行即可看到效果。
總結
以上是生活随笔為你收集整理的通过Dapr实现一个简单的基于.net的微服务电商系统(十九)——分布式事务之Saga模式...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BeetleX服务网关流量控制
- 下一篇: Windows 11 上大招!正式支持安