GitHub 标星 14000+,阿里开源的 SEATA 如何应用到极致?
戳藍字“CSDN云計算”關(guān)注我們哦!
作者簡介:袁鳴凱,家樂福技術(shù)總監(jiān), 高知特有限技術(shù)公司中國區(qū)架構(gòu)師,HP上海研發(fā)技術(shù)專家,夸客金融首席架構(gòu)師,現(xiàn)任家樂福中國區(qū)技術(shù)總監(jiān)。多年互聯(lián)網(wǎng)、企業(yè)級SOA、微服務(wù)、全渠道中臺方面的架構(gòu)設(shè)計實戰(zhàn)經(jīng)驗,曾先后參與過Metlife、CIGNA(信諾保險)內(nèi)部開發(fā)設(shè)計安全規(guī)范制定,以及參與過JAVA代碼標準規(guī)范的編寫。
出品 | CSDN(CSDNnews)
開篇
阿里把FESCAR開源了,開源后的名稱叫SEATA,目前GIT上已經(jīng)超13000+星。
可惜筆者遍歷全網(wǎng)段,無一篇是生產(chǎn)實用級說明。同時,GIT官網(wǎng)上的相關(guān)文檔缺失以及Sample都太HelloWorld了,無法應(yīng)用在真正的生產(chǎn)環(huán)境上。
于是,筆者結(jié)合了在6、7年前那時在那個MQ年代來解決分布式事務(wù)的經(jīng)驗,結(jié)合這次的SEATA(最新一次COMMIT在2019年12月底)來講一下最最新的也是目前最最潮的,如何解決分布式事務(wù)同時要考慮數(shù)據(jù)的最終一致性還要兼顧性能及高效、吐吞率時,作為阿里的這一套開源組合是怎么把它們做到極致的。
我們在全篇中會引用兩個例子:
第一個例子會用商品與庫存來模擬SEATA中的AT模式的分布式事務(wù)如何實現(xiàn);
第二個例子會使用兩個銀行間的跨行轉(zhuǎn)款來模擬SEATA中的TCC模式的分布式事務(wù)如何實現(xiàn)。
尤其,目前網(wǎng)上所有的對于SEATA的TCC講解只有一篇阿里原本的SEATA-tcc,它原本自帶的這個例子有如下幾個缺點:
?
若干個provider混在一起
provider和consumer混在一個項目
不支持nacos連接
不支持注解
然后網(wǎng)上所有的博客全部是圍繞著這篇helloworld級別的例子而講,其實很多都是抄襲,沒有一篇融入了自己的領(lǐng)會與思想,也沒有去把原本的例子按照生產(chǎn)級去做分離,這顯然會誤導(dǎo)很多讀者。
?
因此,我們這次就在原本阿里官方的例子上做生產(chǎn)級別的增強,使得它可以適應(yīng)你正要準備做的生產(chǎn)環(huán)境全模擬。
筆者在這邊并不想作太多長篇大論或者像網(wǎng)上所有的關(guān)于SEATA方面的文章那樣直接COPY PASTER一堆的所謂源碼來湊字數(shù)。筆者在這邊先講一下分布式事務(wù)的幾個重要概念,然后上生產(chǎn)上的實戰(zhàn)級代碼、應(yīng)用和剖析。
簡述分布式事務(wù)
在大型網(wǎng)站、并發(fā)流量高的網(wǎng)站,其實也不用太高,高到什么樣的一個層級需要考慮分布式事務(wù)呢。看一下下面這樣的一個描述:當你的生產(chǎn)環(huán)境的DB中假設(shè)有100個表,每個表數(shù)據(jù)量超過1億。這時已經(jīng)不是集群、主備、讀寫分離這么簡單的一件事就可以搞定了。
或者你可以通過集群、主備、讀寫分離來提高網(wǎng)站前端的響應(yīng)速度、吞吐量,可是你知道嗎?如果你真的是在這樣的一個量級的環(huán)境下呆過,你一定會對有一種情況不陌生!那就是當每個或者說主要單表超1億數(shù)據(jù)的就算你有再多的主或者從或者分離,只要你在“主數(shù)據(jù)庫”上作一條修改(刪除或者是更新),它就會有一個動作叫作“主從同步”。
然后你會發(fā)現(xiàn),這個主從同步會造成你的生產(chǎn)數(shù)據(jù)庫頻繁的“主從延遲”,我這邊指的是6主6從每個固態(tài)硬盤、64C CPU, 128GB這樣的配置的mySQL哦,如果你說“你們家狂有錢,都是512GB,128C的小型機”我也說這種情況最多是晚幾天會出現(xiàn)但遲早還會出現(xiàn)的。
當主從延遲出現(xiàn)后會發(fā)生什么呢?比如說你要刪一條數(shù)據(jù),主刪了然后開始去試圖同步從數(shù)據(jù)庫時,它會鎖庫、鎖表。假設(shè)你有1000個表(一個系統(tǒng)子模塊就有1000個表,一個大型網(wǎng)站有個30-40個子模塊對于大型網(wǎng)站來說很正常),每個表是上億數(shù)據(jù),任何一筆業(yè)務(wù)造成的JOIN操作時同時這個數(shù)據(jù)庫還在同步主從庫此時這個同步付出的代價是“6-8小時甚至更多”,同步失敗,這個數(shù)據(jù)的一致性就會越來越糟糕,再說白了,你的經(jīng)營數(shù)據(jù)就會受到嚴重影響。
嚴重到后來你數(shù)據(jù)庫的磁盤快不夠了、你要刪一點歷史數(shù)據(jù),你都不敢去操作了,因為你一操作,從凌晨到早上8:00左右你的主從同步同步不完,庫還鎖著此時就會影響到了你的線上、線下的業(yè)務(wù)了。這都是因為數(shù)據(jù)庫太大太重造成了連你要刪點歷史記錄、日志都不敢動的情況。這時IT會相當?shù)谋粍印?/p>
因此我們按照最優(yōu)的設(shè)計會保證單表數(shù)據(jù)不要超過2000萬條,此時我們就會去做表的業(yè)務(wù)垂直折分,于是就有了微服務(wù),它就會折成這樣的架構(gòu)。我們可以看到每一個數(shù)據(jù)庫與服務(wù)都會折開甚至同一個會員也會折成每1000萬數(shù)據(jù)一個庫對應(yīng)著一個微服務(wù)實例。
折完了情況當然得到了很大程度的改善,性能、實時性、吞吐量都得到了提高。然后碰到了下面類似的場景了,此時分布式事務(wù)的問題就出現(xiàn)了:
場景一
從上例我們可以看到,當商品主數(shù)據(jù)與庫存主數(shù)據(jù)被折開來后,就會發(fā)生一個數(shù)據(jù)一致性的問題。假設(shè)你在后臺對商品主數(shù)據(jù)做了一個添加或者是更新的動作那么整個系統(tǒng)也要求相應(yīng)的庫存數(shù)據(jù)與主數(shù)據(jù)必須一致,而一旦你拆成了微服務(wù)后主數(shù)據(jù)與庫存說白了其實已經(jīng)變成了兩個不同的系統(tǒng),每個系統(tǒng)都有自己的獨立DB。這時要解決的就是這2個系統(tǒng)間任何的一個更新操作失敗后為了維護數(shù)據(jù)的一致性那么這兩個相關(guān)的“服務(wù)”都需要回滾之前的操作。
?
場景二
銀行跨行轉(zhuǎn)款,假設(shè)帳戶A是工行,它通過工行向B的招商銀行帳戶轉(zhuǎn)帳過去。這個轉(zhuǎn)帳可是一個分布式事務(wù),要么成功要么失敗,不可能會有“部分成功”,這也是要求數(shù)據(jù)最終一致性的一個分布式事務(wù)的場景。
無論是場景一還是場景二,它就講究數(shù)據(jù)的最終一致性。對于這個問題的討論20多年前就已經(jīng)產(chǎn)生了,解決方案也早有了。
從最早的使用MQ的acknowledge模式在事務(wù)發(fā)起時先通知一下相關(guān)參與方,當所有相關(guān)參與方commit(成功)后主發(fā)起事務(wù)再顯示成功,如果有一方失敗,每一個參與方都會被通知到,此時再逐級回滾事務(wù)。到現(xiàn)代的分布式事務(wù)、跨表事務(wù)都是為了解決類似問題而誕生。
但是,傳統(tǒng)的做法在面對大流量大并的場景下,如果是類似最早的MQ的這種逐級通知方式它就會嚴重影響系統(tǒng)的交易時的性能,它的吞吐量就會受到制約。
但是在使用分布式事務(wù)的場景中,我們要求的是數(shù)據(jù)的最終一致性,它勢必會涉及到鎖庫、鎖表、鎖業(yè)務(wù)段,因此我們近20年來一直也都在數(shù)據(jù)的一致性和性能間試圖達到一個平衡。
這于是就誕生了幾大核心的解決方案,即:2PC(二階段)提交、3PC(三階段-在二階段上加了一個準備階段)與TCC(事務(wù)補償)機制。
對于這幾大核心解決方案的原理涉及到的CAP和PAXOS理論本文不做探討,網(wǎng)上太多相關(guān)論文了,如果你要應(yīng)付PPT架構(gòu)師面試可以去死記硬背,如果你要上生產(chǎn)代碼,那么我們接下去繼續(xù)說。這邊只做簡單敘述2PC與TCC的核心機制。
2PC(二階段)提交
1)第一階段:準備階段(prepare)
協(xié)調(diào)者通知參與者準備提交訂單,參與者開始投票。
協(xié)調(diào)者完成準備工作向協(xié)調(diào)者回應(yīng)Yes。
2)第二階段:提交(commit)/回滾(rollback)階段
協(xié)調(diào)者根據(jù)參與者的投票結(jié)果發(fā)起最終的提交指令。
如果有參與者沒有準備好則發(fā)起回滾指令。
應(yīng)用程序通過事務(wù)協(xié)調(diào)器向兩個庫發(fā)起prepare,兩個數(shù)據(jù)庫收到消息分別執(zhí)行本地事務(wù)(記錄日志),但不提交,如果執(zhí)行成功則回復(fù)yes,否則回復(fù)no。
事務(wù)協(xié)調(diào)器收到回復(fù),只要有一方回復(fù)no則分別向參與者發(fā)起回滾事務(wù),參與者開始回滾事務(wù)。
事務(wù)協(xié)調(diào)器收到回復(fù),全部回復(fù)yes,此時向參與者發(fā)起提交事務(wù)。如果參與者有一方提交事務(wù)失敗則由事務(wù)協(xié)調(diào)器發(fā)起回滾事務(wù)。
TCC事務(wù)補償式提交
TCC事務(wù)補償是基于2PC實現(xiàn)的業(yè)務(wù)層事務(wù)控制方案,它是Try(準備)、Confirm(提交)和Cancel(回滾)三個單詞的首字母,含義如下:
1) Try 檢查及預(yù)留業(yè)務(wù)資源完成提交事務(wù)前的檢查,并預(yù)留好資源。
2) Confirm 確定執(zhí)行業(yè)務(wù)操作,對try階段預(yù)留的資源正式執(zhí)行。
3) Cancel 取消執(zhí)行業(yè)務(wù)操作,對try階段預(yù)留的資源釋放。
1. Try
轉(zhuǎn)帳時,from庫和to庫分別進行帳戶號信息、余額信息、凍潔轉(zhuǎn)帳款項的操作,并鎖定資源。
2. Confirm
from帳戶把轉(zhuǎn)帳金額變成凍結(jié)金額,然后from帳戶扣除轉(zhuǎn)帳金額同時在操作時進行記錄鎖定。to帳戶把轉(zhuǎn)帳金額變成凍結(jié)金額,然后to帳戶余額+轉(zhuǎn)帳金額=剩余金額同時在操作時進行記錄鎖定。
3. Cancel階段
如果在from和to的各自業(yè)務(wù)中有任何一步拋錯或者說失敗,那么from和to的所有操作都要取消各自的操作;
于是
from操作把余額+被凍結(jié)的金額=原有from余額;原有凍結(jié)金額歸0;
to操作把凍結(jié)的金額-轉(zhuǎn)帳金額=原凍結(jié)金額,to操作把余額-轉(zhuǎn)帳=原有余額;
以上所有步驟必須實現(xiàn)“業(yè)務(wù)冪等”,什么叫“業(yè)務(wù)冪等”?
業(yè)務(wù)冪等
就是無論以上各自步驟如何操作,它們的業(yè)務(wù)關(guān)聯(lián)性必須相等,比如說:
from帳戶原有100,凍結(jié)字段為0元,欲轉(zhuǎn)出10元;
to帳戶原有100,凍結(jié)字段為0元,欲從from轉(zhuǎn)入10元;
那么以上一圈步驟輪下來有一步操作了,必須回到這個起始原點的狀態(tài)。這就需要我們的應(yīng)用程序做中間狀態(tài)的保留以及在程序代碼里預(yù)埋“業(yè)務(wù)補償”或者我們也把它稱為“反交易”邏輯。
好了,以上就是核心邏輯,不再展開更深入的原理,再展開就涉及到算法和理論了,我們這邊不是為了幫大家應(yīng)對面試而是幫大家真正的走上“生產(chǎn)環(huán)境”。因此下面就要開始show me the code了,我上面畫圖時其實已經(jīng)預(yù)留了整體的“架構(gòu)設(shè)計”了,因此下面就圍繞著以上的2個場景我們來使用springboot+dubbo+nacos+SEATA來實現(xiàn)它們。
SEATA+Nacos的組合
這邊需要介紹一下SEATA和nacos。
SEATA是什么?
SEATA的前身就是阿里螞蟻金服的:FESCAR,它就是為了解決又要實現(xiàn)事務(wù)的最終一致性,又要保證整體系統(tǒng)的高性能、高吞吐還要解決為了實現(xiàn)以上的事務(wù)2PC或者是TCC時不對已經(jīng)寫好的業(yè)務(wù)代碼進行太多的“侵入式破壞”來設(shè)計的。
寫本文時它的最高版本為1.0.0GA版,最后一次GIT提交時間在2天前即1月19號還在不斷有人提交Patch和fix bug。目前網(wǎng)上所有的sample要么跑不起來、要么不可用全部都是helloworld級別的東西,只可以單機玩無法跑生產(chǎn),而且都不是結(jié)合nacos的應(yīng)用的根本沒法實用,文檔又不全,因此我是把我實際生產(chǎn)中的經(jīng)驗直接分享給到各位的。
Nacos是什么?
這是一個相當成熟了的服務(wù)注冊發(fā)現(xiàn)+資源管理器,目前最新版為1.1.4。它是作為取代Zookeeper的地位的,而事實上它也正在取代Zookeeper,它相當?shù)某墒?#xff0c;比SEATA成熟,必竟它比SEATA出現(xiàn)的早嗎。
我們都知道SpringCloud和Dubbo都有自己的基于ZK的服務(wù)管理中心對吧?那玩意過時了,簡陋、又不易操作、不易運維,dubbo從2.6版開始已經(jīng)內(nèi)含nacos-registry了,因此越來越多的遠程服務(wù)注冊、服務(wù)自發(fā)現(xiàn)開始使用nacos了。
我們注意到了我在畫TCC事務(wù)時放置了一個“配置管理中心”,這是我故意放置的,此處使用的正是Nacos。
SEATA+Nacos是如何協(xié)調(diào)工作的?
SEATA就是事務(wù)管理中心,Transaction Management,簡稱TM。各個Dubbo微服務(wù)會被當成一種Resource被注冊進SEATA的“資源管理器”,我們簡稱RM。
那有Nacos什么事?
大家想一下我上面說過的話,要解決的是“又要實現(xiàn)事務(wù)的最終一致性,又要保證整體系統(tǒng)的高性能、高吞吐還要解決為了實現(xiàn)以上的事務(wù)2PC或者是TCC時不對已經(jīng)寫好的業(yè)務(wù)代碼進行太多的侵入式破壞“。注意此處的“不對寫好的業(yè)務(wù)代碼進行太多的侵入式破壞”這幾個字眼。
如何不侵入式破壞?類反射+回調(diào)->Spring+回調(diào),Yeah!
同時,各個微服務(wù)就是一套獨立的系統(tǒng),怎么讓遠程的2個系統(tǒng)或者是多個系統(tǒng)互相類反射?
那么我們的做法起源于最最最最古老的J2EE v1.2規(guī)范中的EJB的Session Bean的設(shè)計理念。
它的原理其實就是各個EJB(微服務(wù),那時叫SOA)把自己的接口名的全路徑以JNDI的尋址方式注冊進J2EE容器(要玩J2EE可不能用Tomcat哦,Tomcat永遠只是一個webcontainer,要玩J2EE必須使用Websphere,Weblogic或者是開源的JBOSS,spring+mybatis=Thisis not a J2EE)。然后不同參于的“服務(wù)組件”都通過這個JNDI來進行尋址并互相通知(調(diào)用)。
Dubbo的作者(包括他的開發(fā)團隊)曾提出過這么一個思想:我覺得事務(wù)的管理不應(yīng)該屬于Dubbo框架,Dubbo只需實現(xiàn)可被事務(wù)管理即可,像JDBC和JMS都是可被事務(wù)管理的分布式資源,Dubbo只要實現(xiàn)相同的可被事務(wù)管理的行為,比如可以回滾,?其它事務(wù)的調(diào)度,都應(yīng)該由專門的事務(wù)管理器實現(xiàn)。FESCAR就是在這么一個前提下在被架構(gòu)出來的。
SEATA, Nacos, Dubbo三者間就正是這么一種羈絆。
?
Dubbo是微服務(wù)核心服務(wù)提供者;
Dubbo與Dubbo間的通信用的是遠程接口,它需要一個遠程的自動服務(wù)發(fā)現(xiàn)、注冊管理中心,于是就有了Nacos;
而SEATA是一種TM,它通過的是通過去注冊管理中心里尋找相應(yīng)的RM的注冊地址,然后通過遠程消息+異步回調(diào)機制來完成RM內(nèi)的相關(guān)事務(wù)的統(tǒng)一協(xié)調(diào)與管理;
知道了這三者的關(guān)系后我們下面開始就用實例來實現(xiàn)這一次魔幻之旅吧。
SEATA使用講解
所有工程源碼位于我的git上,各位可以下載了后學(xué)習(xí)用:
https://github.com/mkyuangithub/mkyuangithub
SEATA Server端與SEATA Client端的選擇
首先我們使用SEATA-server v0.9.0版本,它是一個中間件,啟動在那邊就可以了。但是對于它的客戶端我們必須要注意使用seata-all version 1.0.0。
沒錯,server端和client端版本不一致,你沒看錯。
Server版我們還不能完全使用SEATA-server 1.0GA,如前文所提,它還不是一個穩(wěn)定版本,而且還在不斷的修相應(yīng)的bug,0.9.0是一個比較穩(wěn)定的版本也支持HA機制,是可以用來上生產(chǎn)的。
Client版我們堅持使用v1.0.0,那是因為0.8.0~0.9.0間的client版有Bug,這個Bug如果你不是真正的生產(chǎn)級別應(yīng)用是發(fā)現(xiàn)不了的,因為如果你直接跑它的官方Sample那是沒有任何問題,如果你要把多個微服務(wù)分開來布署,并且每個微服務(wù)還要有自己的DB(這才是生產(chǎn)級應(yīng)用),然后通過nacos靠SEATA來協(xié)調(diào)不同微服務(wù)間的事務(wù),它會造成你一個微服務(wù)提交事務(wù)后整個后臺事務(wù)“死鎖”,這也在git上被SEATA相關(guān)貢獻者給證明了,這個bug相關(guān)的0.8.1~0.9.0的client端還在待修復(fù)中,而v1.0.0目前沒有這個問題。
SEATA Server的配置與啟動詳解
目前全網(wǎng)所有的講解SEATA的例子均出于兩篇文章,這兩篇一篇是阿里內(nèi)部開發(fā)人員通過SEATA中相應(yīng)的源碼來說明這對搞原理的人來說可以做做相應(yīng)的學(xué)術(shù)研究,另一篇是等于把git上的sample原份不動的抄了一下也沒有太多實際的生產(chǎn)價值,并且會誤導(dǎo)真正要去關(guān)注其技術(shù)的讀者。
我們這邊就拿SEATA-Server0.9.0+Nacos1.1.4來做我們實例的講解。
兩個者的部署運行包請去它們各自的官方git上下載,我這邊也給出了這兩者的官網(wǎng)下載地址:
https://github.com/seata/seata/releases/download/v0.9.0/seata-server-0.9.0.zip
https://github.com/alibaba/nacos/releases/download/1.1.4/nacos-server-1.1.4.zip
要下載tar包的讀者們可直行去下面這兩個網(wǎng)址下載:
https://github.com/seata/seata/releases
https://github.com/alibaba/nacos/releases
?
nacos怎么用我已經(jīng)在上一篇阿里的nacos+springboot+dubbo2.7.3集成以及統(tǒng)一處理異常的兩種方式看詳細有講述了。因此我們主要就來看SEATA的用法。我要強調(diào)的是本示例是推薦大家把nacos和SEATA全部部署在linux上進行使用的,為什么后面你就會知道了。
Seata-Server0.9.0下載后打開它是這么樣的一個目錄結(jié)構(gòu)
你可以直接跑到bin目錄下運行./seata-server.sh,但是我請你先不要這么干,我們進conf目錄中進行一個簡單的不超過5分鐘的配置。如果你不看這章直接只管運行你會變成網(wǎng)上那些根本無法把它運用到實戰(zhàn)的人群之一。
我們進入conf目錄
SEATA支持兩種運行模式,我們要使用nacos運行模式而不是file。
SEATA有兩種運行模式:
它可以依賴于nacos或者是zk、redis、springcloud的eureka的方式
它也可以不依賴于任何第三方,僅靠client端與SEATA間進行通訊,它使用的就是這個conf目錄內(nèi)的file.conf了
因此,如果你要使用nacos來做SEATA的“托載”,就請把這個file.conf刪了,而網(wǎng)上所有的教程竟然都是保留著它。
剁了它吧!
我們用的是SEATA+nacos的這種模式來跑我們的實例的。
至于為什么使用file模式?它和其它模式有什么不一樣的地方呢?
我們打開conf目錄下的registry.conf目錄來一窺究竟吧,下面是registry.conf的內(nèi)容:
registry?{#?file?、nacos?、eureka、redis、zk、consul、etcd3、sofatype?=?"file"nacos?{serverAddr?=?"localhost"namespace?=?""cluster?=?"default"}eureka?{serviceUrl?=?"http://localhost:8761/eureka"application?=?"default"weight?=?"1"}redis?{serverAddr?=?"localhost:6379"db?=?"0"}zk?{cluster?=?"default"serverAddr?=?"127.0.0.1:2181"session.timeout?=?6000connect.timeout?=?2000}consul?{cluster?=?"default"serverAddr?=?"127.0.0.1:8500"}etcd3?{cluster?=?"default"serverAddr?=?"http://localhost:2379"}sofa?{serverAddr?=?"127.0.0.1:9603"application?=?"default"region?=?"DEFAULT_ZONE"datacenter?=?"DefaultDataCenter"cluster?=?"default"group?=?"SEATA_GROUP"addressWaitTime?=?"3000"}file?{name?=?"file.conf"} }config?{#?file、nacos?、apollo、zk、consul、etcd3type?=?"file"nacos?{serverAddr?=?"localhost"namespace?=?""}consul?{serverAddr?=?"127.0.0.1:8500"}apollo?{app.id?=?"seata-server"apollo.meta?=?"http://192.168.1.204:8801"}zk?{serverAddr?=?"127.0.0.1:2181"session.timeout?=?6000connect.timeout?=?2000}etcd3?{serverAddr?=?"http://localhost:2379"}file?{name?=?"file.conf"}
Registry{}段與Config{}段。大家要注意的就是兩個段:
這么一陀都要么?可能么?肯定我們只選用其中一種配置,這邊我們用的就是nacos模式,于是我們把這個registry.conf設(shè)置成我下面這樣即可,看到了吧,registry里的type設(shè)成nacos,config里也設(shè)成nacos,并指定nacos運行的地址,因此你的nacos一定先要運行起來。
這邊再多插一個番外,nacos在windows下你直接使用startup.cmd啟動是沒問題的,它默認會在windows下使用standalone模式啟動,而在linux下你不能直接startup.sh,因為在linux下它會默認使用cluster模式啟動,因此在linux下你需要使用如下啟動命令來啟動你的nacos:
./startup.sh?-m?standaloneregistry?{type?=?"nacos"nacos?{serverAddr?=?"192.168.56.101"namespace?=?"public"cluster?=?"default"} }config?{type?=?"nacos"nacos?{serverAddr?=?"192.168.56.101"namespace?=?"public"cluster?=?"default"} }因此一旦設(shè)置了type=nacos后就請把file.conf文件刪除了即可,不要用file了。為什么這邊的nacos的serverAddr不要寫成:192.168.56.101:8848呢?因為SEATA目前寫死掉如果是nacos那么代碼取端口“寫死8848”,相應(yīng)后期1.0.0GA版本后一定會做成端口動態(tài)設(shè)置的,不過這不影響我們使用的。
因為SEATA如果通過的是file模式運行,客戶端與SEATA server端不是通過注冊中心去做服務(wù)連接和自動發(fā)現(xiàn)的,它是讓擁有seata-client端的jar代碼與seata server端直接通過tcp直連的模式的。
因為我們使用的是type=nacos,因此我們需要把SEATA的相關(guān)服務(wù)給注冊到nacos中。
請使用純文本編輯工具(一定是純文本編輯工具哦)在seata/conf目錄下建立一個叫“nacos.config.txt”的文件。對,你沒看錯文件名就是“nacos-config.txt”文件,而且一個字不能錯。
內(nèi)容如下,注意前方一陀參數(shù)預(yù)警,不過我會把重要的參數(shù)都給說一遍的。
transport.type=TCP transport.server=NIO transport.heartbeat=true transport.thread-factory.boss-thread-prefix=NettyBoss transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker transport.thread-factory.server-executor-thread-prefix=NettyServerBizHandler transport.thread-factory.share-boss-worker=false transport.thread-factory.client-selector-thread-prefix=NettyClientSelector transport.thread-factory.client-selector-thread-size=1 transport.thread-factory.client-worker-thread-prefix=NettyClientWorkerThread transport.thread-factory.boss-thread-size=1 transport.thread-factory.worker-thread-size=8 transport.shutdown.wait=3 service.vgroup_mapping.demo-tx-grp=default service.default.grouplist=192.168.56.101:8091 service.enableDegrade=false service.disable=false service.max.commit.retry.timeout=-1 service.max.rollback.retry.timeout=-1 client.async.commit.buffer.limit=10000 client.lock.retry.internal=10 client.lock.retry.times=30 client.lock.retry.policy.branch-rollback-on-conflict=true client.table.meta.check.enable=true client.report.retry.count=5 client.tm.commit.retry.count=1 client.tm.rollback.retry.count=1 store.mode=db store.file.dir=file_store/data store.file.max-branch-session-size=16384 store.file.max-global-session-size=512 store.file.file-write-buffer-cache-size=16384 store.file.flush-disk-mode=async store.file.session.reload.read_size=100 store.db.datasource=druid store.db.db-type=mysql store.db.driver-class-name=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://192.168.56.101:3306/seata?useUnicode=true store.db.user=seata store.db.password=111111 store.db.min-conn=1 store.db.max-conn=3 store.db.global.table=global_table store.db.branch.table=branch_table store.db.query-limit=100 store.db.lock-table=lock_table recovery.committing-retry-period=1000 recovery.asyn-committing-retry-period=1000 recovery.rollbacking-retry-period=1000 recovery.timeout-retry-period=1000 transaction.undo.data.validation=true transaction.undo.log.serialization=jackson transaction.undo.log.save.days=7 transaction.undo.log.delete.period=86400000 transaction.undo.log.table=undo_log transport.serialization=seata transport.compressor=none metrics.enabled=false metrics.registry-type=compact metrics.exporter-list=prometheus metrics.exporter-prometheus-port=9898 support.spring.datasource.autoproxy=false媽媽呀,這么長一陀,不難理解,它其實也分幾個段來理解的。
SEATA連接DB
SEATA一旦配置了nacos后它會在nacos中生成一個配置管理的服務(wù),這個服務(wù)是需要依托于DB來做持久的,這時你需要把它的store.mode改成db。
此處你看到了,對于db你可以直接使用druid來作你的datasource,它自帶druid客戶端了:
store.db.datasource=druid store.db.db-type=mysql store.db.driver-class-name=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://192.168.56.101:3306/seata?useUnicode=true store.db.user=seata store.db.password=111111 store.db.min-conn=1 store.db.max-conn=3配完了db不要忘記在db中手動運行conf/目錄下的一個叫db_store.sql的文件,它會生成SEATA所需運行的3個表。
--?the?table?to?store?GlobalSession?data drop?table?if?exists?`global_table`; create?table?`global_table`?(`xid`?varchar(128)??not?null,`transaction_id`?bigint,`status`?tinyint?not?null,`application_id`?varchar(32),`transaction_service_group`?varchar(32),`transaction_name`?varchar(128),`timeout`?int,`begin_time`?bigint,`application_data`?varchar(2000),`gmt_create`?datetime,`gmt_modified`?datetime,primary?key?(`xid`),key?`idx_gmt_modified_status`?(`gmt_modified`,?`status`),key?`idx_transaction_id`?(`transaction_id`) );--?the?table?to?store?BranchSession?data drop?table?if?exists?`branch_table`; create?table?`branch_table`?(`branch_id`?bigint?not?null,`xid`?varchar(128)?not?null,`transaction_id`?bigint?,`resource_group_id`?varchar(32),`resource_id`?varchar(256)?,`lock_key`?varchar(128)?,`branch_type`?varchar(8)?,`status`?tinyint,`client_id`?varchar(64),`application_data`?varchar(2000),`gmt_create`?datetime,`gmt_modified`?datetime,primary?key?(`branch_id`),key?`idx_xid`?(`xid`) );--?the?table?to?store?lock?data drop?table?if?exists?`lock_table`; create?table?`lock_table`?(`row_key`?varchar(128)?not?null,`xid`?varchar(96),`transaction_id`?long?,`branch_id`?long,`resource_id`?varchar(256)?,`table_name`?varchar(32)?,`pk`?varchar(36)?,`gmt_create`?datetime?,`gmt_modified`?datetime,primary?key(`row_key`) );這邊的server.vgroup_mapping指的就是分布式事務(wù)生效的那個“范圍”,看下圖就懂了:
這邊的事務(wù)作用group就是我們的service.vgroup_mappiing然后“小點點”后面的這個名字了,這個名字你必須和你在SEATA client端帶的事務(wù)group名一致。
什么是SEATA客戶端,喏,上圖中的2個dubbo和那個消費者(consumer)就是SEATA的客戶端,seata-server0.9.0就是server端。
service.default.grouplist=192.168.56.101:8091是什么意思?
很簡單,就是為這個事務(wù)范圍它會開啟一個基于netty的遠程服務(wù),它是一個遠程消息服務(wù)。當任何一段拋出Exception,這個消息服務(wù)就會觸發(fā)它的onMessage方法中的相應(yīng)的邏輯,它要做的事就是去nacos中尋找注冊進去的provider端和consumer端,然后通過回調(diào)接口來通知各個事務(wù)參與者要么提交,要么回滾用的。
因此這邊的ip或者是hostname就是指向SEATA server端自身的ip即可。
其實這么長一陀參數(shù),我們只要講這幾個核心參數(shù)即可,其它的參數(shù)?你們自己去看看,那都是用來調(diào)解性能閥值的,你配過druid的相關(guān)閥值參數(shù)就會配那余下的幾個參數(shù),這個可以留給大家根據(jù)自身部署環(huán)境去做調(diào)整,沒有一個絕對的答案。
全配置完后我們要開始啟動SEATA server了。等等等等!還有一個步驟!
你配完這個nacos-config.txt后不代表SEATA啟動時就會加載它,不會的哦,這個文件是要倒過來先“注入nacos”的,而且是在SEATA啟動前先注入給到nacos的,因此你的nacos必須在第1步驟中就啟動起來。
我們在SEATA的conf目錄下看到有這么兩個文件,一個叫nacos-config.sh一個叫nacos-config.py。它們的作用就是:
nacos-config.sh?192.168.56.101此時SEATA會自動找到命令后的ip加上:8848端口號,然后找到conf目錄下的nacos-config.txt文件,然后把nacos-config.txt文件里的那一陀內(nèi)容全部注入到nacos中去的。
這邊重提一下上面說的為什么我推薦大家要在linux下用nacos和SEATA,你們看到了,它沒有nacos-config.bat哈,你要在windows下用要么你裝個cygwin要么裝一個python for windows環(huán)境。
此處千萬記得,在nacos-config.txt文件最后不要留空行啊,因為nacos-config.sh也會把空行當成null值給set進nacos的。
或者你在linux下完成了我上述這些步驟后,再把個SEATA copy回到windows下,然后通過seata/bin下的seata-server.bat運行哈?這不折騰嗎?
啟動前最后要配一個東西,什么?logback.xml文件。
它里面指定了SEATA的log生成在哪兒,要不,它會生成在這些地方 :
windows下默認的seata log生成在c盤的user目錄下;
linux就生成在/home/登錄用戶下的logs/seata了;
全設(shè)完了,我們就可以通過以下命令來啟動seata了。
./seata-server.shSEATA基于2PC的AT模式的生產(chǎn)實例
所有代碼我已經(jīng)放置在了我的git上了,地址在這:
https://github.com/mkyuangithub/mkyuangithub
我們在這個例子中會設(shè)計2個dubbo service:
seata-product-service
seata-stock-service
設(shè)計一個controller,seata-demo-consumer,它會通過這兩個dubbo service分別在product庫和stock庫同時插入一條記錄,如:
{"productname"?:?"tea","stock"?:?10000 }要么成功全部提交事務(wù);這兩個dubbo service可是分別連著自己的物理數(shù)據(jù)庫,連鏈接都不同的數(shù)據(jù)庫的,并且這兩個dubboservice啟動在兩個不同的jvm實例中,以此來完成:
要么在操作過程中有任何Exception拋出那么兩個Dubbo Service全部回滾各自的業(yè)務(wù)以保持分布式事務(wù)的最終數(shù)據(jù)一致性;
先不急著分析代碼,們在做代碼前筆者要把SEATA結(jié)合spring boot和nacos的坑好好的給各位快速擼一把。
我們做1個consumer來溝通這兩個dubbo service的架構(gòu)入手!
工程整體介紹
我們一共會分成三個工程:
seata-product-service,連接著productDB
seata-stock-service,連接著stockDB
seata-demo-consumer,它是一個springboot controller,在一個service方法內(nèi)同時調(diào)用product與stock的dubbo service來完成t_product和t_stock表的插入動作
工程搭建時的通用配置注意事項
每一個工程記得要在它們的src/main/resources目錄下放置一個registry.conf,它用來讓每個工程內(nèi)的seata-all的客戶端可以訪問到SEATA。三個工程都要放。
每個工程要有registry.conf文件
seata-product-service與seata-stock-service工程內(nèi)的registry.conf會調(diào)用工程的maven中的:
<dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId></dependency>registry.conf文件內(nèi)容如下把自身注冊進RM(Resource Manager)里。如果你的type選用的是file那么你還要在每個工程的src/main/resources內(nèi)放置一個file.conf的文件。文件內(nèi)容必須和seata-server端的conf/file.conf文件內(nèi)容一致。目前網(wǎng)上的所謂例子都是基于file來做的,它不會使用到注冊中心的概念只會在SEATA所屬的客戶端和seata-server端使用本地開啟netty然后通過8091端口來互傳,因此我才說這不是生產(chǎn)級的應(yīng)用。
registry.conf文件內(nèi)容如下:
registry?{type?=?"nacos"nacos?{serverAddr?=?"192.168.56.101"namespace?=?"public"cluster?=?"default"}}config?{type?=?"nacos"nacos?{serverAddr?=?"192.168.56.101"namespace?=?""} }
而seata-demo-consumer里有一個普通的service方法,它是一個dubbo consumer,在它的方法內(nèi)會有一個這樣的標記。
這個@GlobalTransactional用來通過nacos尋址返向找到product和stock內(nèi)的dubbo provider然后再回調(diào)相應(yīng)的provider service中的涉及到db的transaction來執(zhí)行回滾或提交。它只安置于調(diào)用若干個dubboservice的業(yè)務(wù)service這一層,各dubbo service內(nèi)不需要加這個參數(shù)(除非你有事嵌套和傳播的需求)。
為此,我們還需要多做一步,那就是把SEATA的transaction注入到有DB涉及到的dubbo provider工程內(nèi),見下文。
使用SEATA datasource proxy來代理每個dubbo的事務(wù)
在dubbo工程內(nèi),有涉及到操作DB的,都需要在連接dataSource時這樣進行配置,此處我們使用spring boot全注解方式來連接DB。
package?org.sky.product.config;import?org.springframework.boot.context.properties.ConfigurationProperties; import?org.springframework.context.annotation.Bean; import?org.springframework.context.annotation.Configuration; import?org.springframework.context.annotation.Primary; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.jdbc.datasource.DataSourceTransactionManager; import?com.alibaba.druid.pool.DruidDataSource; import?io.seata.rm.datasource.DataSourceProxy; import?io.seata.spring.annotation.GlobalTransactionScanner;@Configuration public?class?DruidConfig?{@Bean@ConfigurationProperties(prefix?=?"spring.datasource")public?DruidDataSource?druidDataSource()?{return?new?DruidDataSource();}/***?init?datasource?proxy*?*?@Param:?druidDataSource?datasource?bean?instance*?@Return:?DataSourceProxy?datasource?proxy*/@Beanpublic?DataSourceProxy?dataSourceProxy(DruidDataSource?druidDataSource)?{return?new?DataSourceProxy(druidDataSource);}@Beanpublic?DataSourceTransactionManager?transactionManager(DataSourceProxy?dataSourceProxy)?{return?new?DataSourceTransactionManager(dataSourceProxy);}/***?init?jdbc?template?by?using?the?dataSourceProxy*?*?@Return:?JdbcTemplate*/@Primary@Beanpublic?JdbcTemplate?dataSource(DataSourceProxy?dataSourceProxy)?{return?new?JdbcTemplate(dataSourceProxy);}/***?init?global?transaction?scanner**?@Return:?GlobalTransactionScanner*/@Beanpublic?GlobalTransactionScanner?globalTransactionScanner()?{return?new?GlobalTransactionScanner("seata-product-service",?"demo-tx-grp");}}每個GlobalTransactionScanner中相應(yīng)參數(shù)講解
以上代碼中最后一個@Bean配置內(nèi)有這么一句:
return newGlobalTransactionScanner("seata-product-service","demo-tx-grp");
此處的第一個參數(shù)是你的dubbo-application-name,它對應(yīng)于你的application.properties文件內(nèi)的如下這么一塊內(nèi)容。每個dubbo provider的name都不一樣的。
dubbo.protocol.id=dubbo dubbo.protocol.name=dubbo dubbo.application.name=seata-product-service dubbo.application.id=seata-product-service
此處的第二個參數(shù)就是“全局事務(wù)的范圍”,它也對應(yīng)著我們在SEATA的nacos-config.txt文件內(nèi)配置的這一行,要是這個名字對不起來那么各SEATA client端無法通過seata-server:8091的netty端口連上SEATA的service group,客戶端也會頻繁拋 can not connect toserver的出錯信息的:
各SEATA客戶端對于回滾的設(shè)置
每個SEATA客戶端連接著的DB庫內(nèi)要有一個undo_log表,這個表是供SEATA客戶端自己調(diào)用的,它的建表語句就位于seata/conf目錄內(nèi)的db_undo_log.sql。
有參于事務(wù)的客戶端的業(yè)務(wù)DB庫內(nèi)就必須要有一個這樣的表,SEATA在回調(diào)dubbo provider時在提交和回滾時是會自動用到這張表的。
CREATE?TABLE?`undo_log`?(`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT,`branch_id`?bigint(20)?NOT?NULL,`xid`?varchar(100)?NOT?NULL,`context`?varchar(128)?NOT?NULL,`rollback_info`?longblob?NOT?NULL,`log_status`?int(11)?NOT?NULL,`log_created`?datetime?NOT?NULL,`log_modified`?datetime?NOT?NULL,`ext`?varchar(100)?DEFAULT?NULL,PRIMARY?KEY?(`id`),UNIQUE?KEY?`ux_undo_log`?(`xid`,`branch_id`) )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8;seata-product-service工程全代碼
和之前的工程一樣,它們共用一個parent即nacos-parent,這邊給出seata-product-service工程的pom.xml文件。這邊要記的就是SEATA的客戶端一定要用1.0.0,這些版本號的工作都是在我的parent工程內(nèi)制定的。
<properties><java.version>1.8</java.version><spring-boot.version>1.5.15.RELEASE</spring-boot.version><dubbo.version>2.7.3</dubbo.version><curator-framework.version>4.0.1</curator-framework.version><curator-recipes.version>2.8.0</curator-recipes.version><druid.version>1.1.20</druid.version><guava.version>27.0.1-jre</guava.version><fastjson.version>1.2.59</fastjson.version><dubbo-registry-nacos.version>2.7.3</dubbo-registry-nacos.version><nacos-client.version>1.1.4</nacos-client.version><mysql-connector-java.version>5.1.46</mysql-connector-java.version><disruptor.version>3.4.2</disruptor.version><aspectj.version>1.8.13</aspectj.version><spring.data.redis>1.8.14-RELEASE</spring.data.redis><skycommon.version>0.0.1-SNAPSHOT</skycommon.version><seata.version>1.0.0</seata.version><netty.version>4.1.42.Final</netty.version><nacos.spring.version>0.1.4</nacos.spring.version><lombok.version>1.16.22</lombok.version><javax.servlet.version>3.1.0</javax.servlet.version><mybatis.version>3.4.5</mybatis.version><mybatis.spring.version>1.3.1</mybatis.spring.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><compiler.plugin.version>3.8.1</compiler.plugin.version><war.plugin.version>3.2.3</war.plugin.version><jar.plugin.version>3.1.2</jar.plugin.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties>下面是seata-product-service的pom.xml全內(nèi)容?
相應(yīng)的t_product建表語句如下:
CREATE?TABLE?`t_product`?(`product_id`?int(16)?NOT?NULL?AUTO_INCREMENT,`product_name`?varchar(45)?COLLATE?utf8_bin?DEFAULT?NULL,PRIMARY?KEY?(`product_id`) )?;application.properties
項目使用springboot全注解,來看看項目的application.properties文件的內(nèi)容吧。
以下是項目的自動注入相關(guān)資源到pring用的DruidConfig.java文件
package?org.sky.product.config;import?org.springframework.boot.context.properties.ConfigurationProperties; import?org.springframework.context.annotation.Bean; import?org.springframework.context.annotation.Configuration; import?org.springframework.context.annotation.Primary; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.jdbc.datasource.DataSourceTransactionManager; import?com.alibaba.druid.pool.DruidDataSource; import?io.seata.rm.datasource.DataSourceProxy; import?io.seata.spring.annotation.GlobalTransactionScanner;@Configuration public?class?DruidConfig?{@Bean@ConfigurationProperties(prefix?=?"spring.datasource")public?DruidDataSource?druidDataSource()?{return?new?DruidDataSource();}/***?init?datasource?proxy*?*?@Param:?druidDataSource?datasource?bean?instance*?@Return:?DataSourceProxy?datasource?proxy*/@Beanpublic?DataSourceProxy?dataSourceProxy(DruidDataSource?druidDataSource)?{return?new?DataSourceProxy(druidDataSource);}@Beanpublic?DataSourceTransactionManager?transactionManager(DataSourceProxy?dataSourceProxy)?{return?new?DataSourceTransactionManager(dataSourceProxy);}/***?init?jdbc?template?by?using?the?dataSourceProxy*?*?@Return:?JdbcTemplate*/@Primary@Beanpublic?JdbcTemplate?dataSource(DataSourceProxy?dataSourceProxy)?{return?new?JdbcTemplate(dataSourceProxy);}/***?init?global?transaction?scanner**?@Return:?GlobalTransactionScanner*/@Beanpublic?GlobalTransactionScanner?globalTransactionScanner()?{return?new?GlobalTransactionScanner("seata-product-service",?"demo-tx-grp");}}可以看到它開了一個dubbo providerservice,dubbo.application.name申明了該dubbo服務(wù)的名稱。并且這個dubbo provider service會運行在20880端口。
此處我們使用jdbcTemplate來做全工程的SQL訪問。大家可以看到它使用了SEATA帶的DatasourceProxy代理了DruiDatasource,然后再用GlobalTransationScanner來代理Spring的TransactionManager。
ProductDAO.java
package?org.sky.product.dao;import?org.sky.exception.DemoRpcRunTimeException;public?interface?ProductDAO?{public?long?addNewProduct(String?productName)?throws?DemoRpcRunTimeException; }ProductDAOImpl
然后我們給出ProductBizService,這個BizService只是一個普通的@Service,它會被ProductDubboService調(diào)用,整個SEATA的事務(wù)會通過這樣的事務(wù)傳播路徑Dubbo->Service->Dao一路傳進來,因此對于參于全局SEATA事務(wù)的@Service類就不需要再加@Transactional這樣的注解了。
package?org.sky.product.service.biz;import?org.sky.exception.DemoRpcRunTimeException; import?org.sky.product.dao.ProductDAO; import?org.sky.product.service.biz.ProductBizService; import?org.sky.service.BaseService; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.stereotype.Service;@Service public?class?ProductBizServiceImpl?extends?BaseService?implements?ProductBizService?{@AutowiredProductDAO?productDAO;@Overridepublic?long?addProduct(String?productName)?throws?DemoRpcRunTimeException?{long?newProdId?=?0;try?{newProdId?=?productDAO.addNewProduct(productName);}?catch?(Exception?e)?{logger.error("error?occured?on?Biz?Service?Side:?"?+?e.getMessage(),?e);throw?new?DemoRpcRunTimeException("error?occured?on?Biz?Service?Side:?"?+?e.getMessage(),?e);}return?newProdId;}}ProductDubboServiceImpl.java
下面是product的微服務(wù)ProductDubboService的實現(xiàn)類,我們把ProductDubboService這種用于遠程訪問的Dubbo的Inerface(在EJB理念里我們管這種叫殘根)都放置在了sky-common工程里了。下面給出ProductDubboService的實現(xiàn)類。
package?org.sky.product.service.dubbo;import?org.apache.dubbo.config.annotation.Service; import?org.sky.exception.DemoRpcRunTimeException; import?org.sky.product.service.biz.ProductBizService; import?org.sky.product.service.dubbo.ProductDubboService; import?org.sky.service.BaseService; import?org.springframework.beans.factory.annotation.Autowired;@Service(version?=?"1.0.0",?interfaceClass?=?ProductDubboService.class,?timeout?=?30000,?loadbalance?=?"roundrobin") public?class?ProductDubboServiceImpl?extends?BaseService?implements?ProductDubboService?{@AutowiredProductBizService?productBizService;@Overridepublic?long?addProduct(String?productName)?throws?DemoRpcRunTimeException?{long?result?=?0;try?{result?=?productBizService.addProduct(productName);logger.info("======>insert?into?t_product?with?product_id->"?+?result?+?"?and?productname->"?+?productName+?"?successfully");}?catch?(Exception?e)?{logger.error("error?occured?on?insert?product?with:?product_id->\"?+?newProdId?+?\"?and?productname->\"\r\n"+?"?????????????????+?productName"?+?e.getMessage(),?e);throw?new?DemoRpcRunTimeException("error?occured?on?insert?product?with:?product_id->\"?+?newProdId?+?\"?and?productname->\"\r\n"+?"?????????????????+?productName"?+?e.getMessage(),e);}return?result;}}
最后,就是用于運行該工程用的springboot的Application.java文件了
運行起來后,它會把這個dubbo providerservice自動注冊進nacos去。
因此啟動順序一定記得是mysql->nacos->seata再啟動各個dubbo provider以及consumer。
seata-stock-service
pom.xml
同seata-product-service項目。
<project?xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.sky.demo</groupId><artifactId>seata-stock-service</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>seata-stock-service</name><description>Demo?project?Dubbo+Nacos+Seata</description><parent><groupId>org.sky.demo</groupId><artifactId>nacos-parent</artifactId><version>0.0.1-SNAPSHOT</version></parent><properties><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-core</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-spring</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--?Dubbo?Registry?Nacos?--><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-registry-nacos</artifactId></dependency><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId></dependency><dependency><groupId>org.sky.demo</groupId><artifactId>skycommon</artifactId><version>${skycommon.version}</version></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId></dependency><dependency><groupId>com.alibaba.boot</groupId><artifactId>nacos-config-spring-boot-starter</artifactId><exclusions><exclusion><artifactId>nacos-client</artifactId><groupId>com.alibaba.nacos</groupId></exclusion></exclusions></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><finalName>${project.artifactId}</finalName><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><jvmArguments>-Dfile.encoding=UTF-8</jvmArguments></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-war-plugin</artifactId><version>2.6</version><configuration><failOnMissingWebXml>false</failOnMissingWebXml></configuration></plugin></plugins><resources><resource><directory>src/main/resources</directory><excludes><exclude>application*.properties</exclude></excludes></resource><resource><directory>src/main/webapp</directory><targetPath>META-INF/resources</targetPath><includes><include>**/**</include></includes></resource><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>application.properties</include></includes></resource></resources></build> </project>
建表語句:
application.properties
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driverClassName=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://192.168.56.101:3306/stock?useUnicode=true&characterEncoding=utf-8&useSSL=false spring.datasource.username=stock spring.datasource.password=111111 spring.datasource.initialize=falsespring.redis.database=0?? spring.redis.host=192.168.56.101 spring.redis.port=6379 spring.redis.password=111111 spring.redis.pool.max-active=10?? spring.redis.pool.max-wait=-1?? spring.redis.pool.max-idle=10? spring.redis.pool.min-idle=0?? spring.redis.timeout=1000 #tomcat?settings server.port=8081 server.tomcat.maxConnections=300 server.tomcat.maxThreads=300 server.tomcat.uriEncoding=UTF-8 server.tomcat.maxThreads=300 server.tomcat.minSpareThreads=150 server.connectionTimeout=20000 server.tomcat.maxHttpPostSize=0 server.tomcat.acceptCount=300#Dubbo?provider?configurationdubbo.protocol.id=dubbo dubbo.protocol.name=dubbo dubbo.application.name=seata-stock-service dubbo.application.id=seata-stock-service dubbo.registry.protocol=dubbo dubbo.registry.address=nacos://192.168.56.101:8848 dubbo.protocol.name=dubbo dubbo.protocol.port=20981 dubbo.protocol.threads=2000 dubbo.protocol.queues=1000 dubbo.protocol.threadpool=cached dubbo.provider.retries=3 dubbo.provider.threadpool=cached dubbo.provider.threads=2000 dubbo.provider.connections=2000 dubbo.provider.acceptes=0 dubbo.provider.executes=0 dubbo.consumer.actives=0 dubbo.scan.base-packages=org.sky.stock.service.dubbo logging.config=classpath:log4j2.xml它的registry.conf和seata-product-service中的registry.conf內(nèi)容完全一模一樣,此處就不再重復(fù)了。可以看到它是另一個dubbo provider實例,它會運行在20981端口,同時請注意它的dubbo.application.name,和seata-product-service是不同的。
DruidConfig.java
package?org.sky.stock.config;import?javax.sql.DataSource;import?org.springframework.beans.factory.annotation.Value; import?org.springframework.boot.autoconfigure.EnableAutoConfiguration; import?org.springframework.boot.context.properties.ConfigurationProperties; import?org.springframework.boot.web.servlet.FilterRegistrationBean; import?org.springframework.boot.web.servlet.ServletRegistrationBean; import?org.springframework.context.annotation.Bean; import?org.springframework.context.annotation.Configuration; import?org.springframework.context.annotation.Primary; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.jdbc.datasource.DataSourceTransactionManager; import?com.alibaba.druid.pool.DruidDataSource; import?com.alibaba.druid.support.http.StatViewServlet; import?com.alibaba.druid.support.http.WebStatFilter; import?io.seata.rm.datasource.DataSourceProxy; import?io.seata.spring.annotation.GlobalTransactionScanner;@Configuration public?class?DruidConfig?{@Bean@ConfigurationProperties(prefix?=?"spring.datasource")public?DruidDataSource?druidDataSource()?{return?new?DruidDataSource();}/***?init?datasource?proxy*?*?@Param:?druidDataSource?datasource?bean?instance*?@Return:?DataSourceProxy?datasource?proxy*/@Beanpublic?DataSourceProxy?dataSourceProxy(DruidDataSource?druidDataSource)?{return?new?DataSourceProxy(druidDataSource);}@Beanpublic?DataSourceTransactionManager?transactionManager(DataSourceProxy?dataSourceProxy)?{return?new?DataSourceTransactionManager(dataSourceProxy);}/***?init?jdbc?template?by?using?the?dataSourceProxy*?*?@Return:?JdbcTemplate*/@Beanpublic?JdbcTemplate?dataSource(DataSourceProxy?dataSourceProxy)?{return?new?JdbcTemplate(dataSourceProxy);}/***?init?global?transaction?scanner**?@Return:?GlobalTransactionScanner*/@Beanpublic?GlobalTransactionScanner?globalTransactionScanner()?{return?new?GlobalTransactionScanner("seata-stock-service",?"demo-tx-grp");}可以看到此處的GlobalTransactionScanner的第一個參數(shù)是貼合著dubbo.application.name的。
StockDAO.java
package?org.sky.stock.dao;import?org.sky.exception.DemoRpcRunTimeException;public?interface?StockDAO?{public?void?addNewStock(long?productId,?int?stock)?throws?DemoRpcRunTimeException; }StockDAOImpl.java
package?org.sky.stock.dao;import?javax.annotation.Resource;import?org.sky.dao.BaseDAO; import?org.sky.exception.DemoRpcRunTimeException; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.stereotype.Component;@Component public?class?StockDAOImpl?extends?BaseDAO?implements?StockDAO?{@Resourceprivate?JdbcTemplate?jdbcTemplate;@Overridepublic?void?addNewStock(long?productId,?int?stock)?throws?DemoRpcRunTimeException?{String?prodSql?=?"insert?into?t_stock(product_id,stock)values(?,?)";try?{jdbcTemplate.update(prodSql,?productId,?stock);}?catch?(Exception?e)?{throw?new?DemoRpcRunTimeException(e.getMessage(),?e);}}}StockBizServiceImpl.java
package?org.sky.stock.service.biz;import?org.sky.exception.DemoRpcRunTimeException; import?org.sky.service.BaseService; import?org.sky.stock.dao.StockDAO; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.stereotype.Service;@Service public?class?StockBizServiceImpl?extends?BaseService?implements?StockBizService?{@Autowiredprivate?StockDAO?stockDAO;@Overridepublic?void?addStock(long?productId,?int?stock)?throws?DemoRpcRunTimeException?{try?{stockDAO.addNewStock(productId,?stock);}?catch?(Exception?e)?{throw?new?DemoRpcRunTimeException("error?occured?on?Biz?Service?Side:?"?+?e.getMessage(),?e);}}}StockDubboServiceImpl.java
啟動用Application.java
package?org.sky.stock;import?org.apache.dubbo.config.spring.context.annotation.EnableDubbo; import?org.springframework.boot.SpringApplication; import?org.springframework.boot.autoconfigure.EnableAutoConfiguration; import?org.springframework.context.annotation.ComponentScan; import?org.springframework.transaction.annotation.EnableTransactionManagement;@EnableDubbo @EnableAutoConfiguration @ComponentScan(basePackages?=?{?"org.sky.stock"?})public?class?Application?{public?static?void?main(String[]?args)?{SpringApplication.run(Application.class,?args);}}同樣,我們把它啟動起來。
seata-demo-consumer工程
這個就是一個純 spring boot controller工程了,它給我們提供一個人機界面,我們可以通過它post一個json請求以達到后端兩個個不同的dubbo provider service向兩個不同的數(shù)據(jù)庫插入數(shù)據(jù),如果有任意錯誤也可以進行全部的回滾。
pom.xml
<project?xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.sky.demo</groupId><artifactId>seata-demo-consumer</artifactId><version>0.0.1-SNAPSHOT</version><parent><groupId>org.sky.demo</groupId><artifactId>nacos-parent</artifactId><version>0.0.1-SNAPSHOT</version></parent><packaging>war</packaging><description>Demo?project?Dubbo+Nacos+Seata</description><dependencies><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-core</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-spring</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId><scope>compile</scope></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--?Dubbo?Registry?Nacos?--><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-registry-nacos</artifactId></dependency><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId></dependency><dependency><groupId>org.sky.demo</groupId><artifactId>skycommon</artifactId><version>${skycommon.version}</version></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId></dependency><dependency><groupId>com.alibaba.boot</groupId><artifactId>nacos-config-spring-boot-starter</artifactId><exclusions><exclusion><artifactId>nacos-client</artifactId><groupId>com.alibaba.nacos</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><version>${javax.servlet.version}</version><scope>provided</scope></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/webapp</directory><targetPath>META-INF/resources</targetPath><includes><include>**/**</include></includes></resource><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>application.properties</include><include>application-${profileActive}.properties</include></includes></resource></resources></build> </project>application.properties
此處的application.properties相對provider工程來說會比較簡單,它也不需要數(shù)據(jù)庫連接信息。
server.port=8082 server.tomcat.maxConnections=300 server.tomcat.maxThreads=300 server.tomcat.uriEncoding=UTF-8 server.tomcat.maxThreads=300 server.tomcat.minSpareThreads=150 server.connectionTimeout=20000 server.tomcat.maxHttpPostSize=0 server.tomcat.acceptCount=300#Dubbo?provider?configuration dubbo.protocol.id=dubbo dubbo.protocol.name=dubbo dubbo.application.name=demo-seata-consumer dubbo.application.id=demo-seata-consumer dubbo.registry.protocol=dubbo dubbo.registry.address=nacos://192.168.56.101:8848 #dubbo.consumer.time=120000logging.config=classpath:log4j2.xml但是一樣,它也需要連接nacos來Reference dubbo provider的應(yīng)用。我們接下來看它的基于于spring boot的自動裝配,SeataAutoConfig.java,由于它根本不需要連接什么DB,因此它里面只有一個GlobalTransactionScanner。
SeataAutoConfig.java
package?org.sky.seatademo.config;import?io.seata.spring.annotation.GlobalTransactionScanner;import?org.springframework.boot.context.properties.ConfigurationProperties; import?org.springframework.context.annotation.Bean; import?org.springframework.context.annotation.Configuration; @Configuration public?class?SeataAutoConfig?{/***?init?global?transaction?scanner**?@Return:?GlobalTransactionScanner*/@Beanpublic?GlobalTransactionScanner?globalTransactionScanner()?{return?new?GlobalTransactionScanner("demo-seata-consumer",?"demo-tx-grp");} }注意:這邊的GlobalTransactionScanner中的第一個參數(shù),這個參數(shù)是seata-demo-consumer的dubbo.application.name,同時也是后面我們要生效的Global Transaction的Scope。
來看核心業(yè)務(wù)邏輯調(diào)用:
BusinessDubboServiceImpl.java
package?org.sky.seatademo.service.biz;import?org.apache.dubbo.config.annotation.Reference; import?org.sky.exception.DemoRpcRunTimeException; import?org.sky.product.service.dubbo.ProductDubboService; import?org.sky.seatademo.vo.SeataProductVO; import?org.sky.service.BaseService; import?org.sky.stock.service.dubbo.StockDubboService; import?org.springframework.stereotype.Service;import?io.seata.core.context.RootContext; import?io.seata.spring.annotation.GlobalTransactional;@Service public?class?BusinessDubboServiceImpl?extends?BaseService?implements?BusinessDubboService?{@Reference(version?=?"1.0.0")private?ProductDubboService?productDubboService;@Reference(version?=?"1.0.0")private?StockDubboService?stockDubboService;@Override@GlobalTransactional(timeoutMills?=?300000,?name?=?"seata-at-service")public?SeataProductVO?addProductAndStock(SeataProductVO?vo)?throws?DemoRpcRunTimeException?{SeataProductVO?rtnProduct?=?new?SeataProductVO();try?{//?logger.info("======>start?global?transaction:?"?+?RootContext.getXID());long?prodId?=?productDubboService.addProduct(vo.getProductName());stockDubboService.addStock(prodId,?vo.getStock());rtnProduct.setProductId(prodId);rtnProduct.setProductName(vo.getProductName());rtnProduct.setStock(vo.getStock());if?(vo.getProductName().equalsIgnoreCase("donny"))?{throw?new?Exception("Mk?Thru?The?Exception?To?Force?Rollback");}?else?{return?rtnProduct;}}?catch?(Exception?e)?{logger.error("error?occured?on?dubbo?BusinessService?side:?"?+?e.getMessage(),?e);throw?new?DemoRpcRunTimeException("error?occured?on?dubbo?BusinessService?side:?"?+?e.getMessage(),?e);}}@Override@GlobalTransactional(timeoutMills?=?120000,?name?=?"demo-seata-consumer")public?SeataProductVO?addProductAndStockFailed(SeataProductVO?vo)?throws?DemoRpcRunTimeException?{SeataProductVO?rtnProduct?=?new?SeataProductVO();try?{logger.info("======>start?global?transaction:?"?+?RootContext.getXID());long?prodId?=?productDubboService.addProduct(vo.getProductName());stockDubboService.addStock(prodId,?vo.getStock());rtnProduct.setProductId(prodId);rtnProduct.setProductName(vo.getProductName());rtnProduct.setStock(vo.getStock());throw?new?Exception("Mk?throw?the?exception?to?enforce?rollback?all?transaction");}?catch?(Exception?e)?{logger.error("error?occured?on?dubbo?BusinessService?side:?"?+?e.getMessage(),?e);throw?new?DemoRpcRunTimeException("error?occured?on?dubbo?BusinessService?side:?"?+?e.getMessage(),?e);}} }看到?jīng)]有,首先:
它會@Reference兩個dubbo provider的引用,然后在一個business方法內(nèi),它開始“切面”事務(wù)了,這邊事務(wù)的范圍正是用的這個name="demo-seata-consumer”來標記的。
同時,我們在這個Service中有一段邏輯,即當productName為donny時,那么故意我們會拋出一個錯誤來以迫使全局事務(wù)回滾!
必竟donny是一個人,它不是一個“合格”的商品!
值的高度注意的一點?,為什么我說截止目前為止網(wǎng)上的例子都是跑不通的呢?
是因為,SEATA使用到的mysql里的幾個用于作持久化作用的表的transaction_name和group_name字段都是32位長度的字符,而那些例子里的application_name和group_name都大于了32位,SEATA運行時就已經(jīng)報錯了更不要說后面還能把事務(wù)給做對了呢。
對于人機交五用界面用的SeataDemoController.java
package?org.sky.seatademo.controller;import?java.util.HashMap; import?java.util.Map;import?org.apache.dubbo.config.annotation.Reference; import?org.sky.controller.BaseController; import?org.sky.seatademo.service.biz.BusinessDubboService; import?org.sky.seatademo.vo.SeataProductVO; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.http.HttpHeaders; import?org.springframework.http.HttpStatus; import?org.springframework.http.MediaType; import?org.springframework.http.ResponseEntity; import?org.springframework.web.bind.annotation.PostMapping; import?org.springframework.web.bind.annotation.RequestBody; import?org.springframework.web.bind.annotation.RequestMapping; import?org.springframework.web.bind.annotation.RestController;import?com.alibaba.fastjson.JSON; import?com.alibaba.fastjson.JSONObject;@RestController @RequestMapping("seatademo") public?class?SeataDemoController?extends?BaseController?{@Autowiredprivate?BusinessDubboService?businessDubboService;@PostMapping(value?=?"/addSeataProduct",?produces?=?"application/json")public?ResponseEntity<String>?addSeataProduct(@RequestBody?String?params)?throws?Exception?{ResponseEntity<String>?response?=?null;String?returnResultStr;HttpHeaders?headers?=?new?HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON_UTF8);Map<String,?Object>?result?=?new?HashMap<>();try?{JSONObject?requestJsonObj?=?JSON.parseObject(params);SeataProductVO?inputProductPara?=?getSeataProductFromJson(requestJsonObj);SeataProductVO?returnData?=?businessDubboService.addProductAndStock(inputProductPara);result.put("code",?HttpStatus.OK.value());result.put("message",?"add?a?new?product?successfully");result.put("productid",?returnData.getProductId());result.put("productname",?returnData.getProductName());result.put("stock",?returnData.getStock());returnResultStr?=?JSON.toJSONString(result);response?=?new?ResponseEntity<>(returnResultStr,?headers,?HttpStatus.OK);}?catch?(Exception?e)?{logger.error("add?a?new?product?with?error:?"?+?e.getMessage(),?e);result.put("message",?"add?a?new?product?with?error:?"?+?e.getMessage());returnResultStr?=?JSON.toJSONString(result);response?=?new?ResponseEntity<>(returnResultStr,?headers,?HttpStatus.EXPECTATION_FAILED);}return?response;}@PostMapping(value?=?"/addSeataProductFailed",?produces?=?"application/json")public?ResponseEntity<String>?addSeataProductFailed(@RequestBody?String?params)?throws?Exception?{ResponseEntity<String>?response?=?null;String?returnResultStr;HttpHeaders?headers?=?new?HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON_UTF8);Map<String,?Object>?result?=?new?HashMap<>();try?{JSONObject?requestJsonObj?=?JSON.parseObject(params);SeataProductVO?inputProductPara?=?getSeataProductFromJson(requestJsonObj);SeataProductVO?returnData?=?businessDubboService.addProductAndStockFailed(inputProductPara);result.put("code",?HttpStatus.OK.value());result.put("message",?"add?a?new?product?successfully");result.put("productid",?returnData.getProductId());result.put("productname",?returnData.getProductName());result.put("stock",?returnData.getStock());returnResultStr?=?JSON.toJSONString(result);response?=?new?ResponseEntity<>(returnResultStr,?headers,?HttpStatus.OK);}?catch?(Exception?e)?{logger.error("add?a?new?product?with?error:?"?+?e.getMessage(),?e);result.put("message",?"add?a?new?product?with?error:?"?+?e.getMessage());returnResultStr?=?JSON.toJSONString(result);response?=?new?ResponseEntity<>(returnResultStr,?headers,?HttpStatus.EXPECTATION_FAILED);}return?response;} }然后就是啟動用Application.java
這個工程我們把它做成了可以用于在tomcat里部署的war包的形式,因為有不少讀者習(xí)慣于用tomcat來部署,此處用于演示如何把springboot的工程做成war。
我們把它也啟動起來
這是seata server后端的日志顯示
nacos中對于seatar的配置的顯示?
測試正反兩個實例
我們先post一段正常的商品信息
{"productname"?:?"apple","stock"?:?10000 }看上面,我們可以看到TM在進行全局事務(wù)的commit。
我們再來提交一個“不正常”的請求
{"productname"?:?"donny","stock"?:?10000 }我們可以看到它觸發(fā)了邏輯拋錯
分別觀察兩個provider的后端
看,TM協(xié)調(diào)下,RM里的相應(yīng)的遠程Service進行了rollback。
再來看數(shù)據(jù)庫端
剛才的那條叫donny的“劣質(zhì)商品”沒有被插進去,它被SEATA給回滾了。
我們接下來要開始詳細講述基于springboot+dubbo+nacos+seata如何實現(xiàn)TCC事務(wù)的實現(xiàn),我們會完成一個跨行轉(zhuǎn)帳的實例。
基于TCC的分布式事務(wù)詳解
?
前面我們講述了SEATA的基于2PC的AT事務(wù)實戰(zhàn)篇。在下篇中我們將會非常詳細的描述一下如何利用SEATA來實現(xiàn)TCC事務(wù)補償機制的原理。
我們要使用的TCC例子-跨行轉(zhuǎn)款問題
還記得我們在前面曾經(jīng)出現(xiàn)過這么一個例子用于詳細描述TCC描述事務(wù)的原理吧?
現(xiàn)在我們就會圍繞著這個例子來進一步用代碼演示它。所有代碼我已經(jīng)上傳到了我的GIT上了,地址在這:https://github.com/mkyuangithub/mkyuangithub
我們假設(shè)有這么一個業(yè)務(wù)場景:
你的公司是一家叫moneyking的第三方支付公司,連接著幾個主要的銀行支付渠道;
現(xiàn)在有一個帳戶A要通過工行向另一個位于招商銀行的B帳戶轉(zhuǎn)帳;
?
轉(zhuǎn)帳要么成功要么失敗。
于是我們結(jié)合著例子創(chuàng)建了3個項目:
tcc-bank-cmb
tcc-bank-icbc
tcc-money-king
tcc-bank-cmb和tcc-bank-icbc都是dubbo provider,它們分別連接著自己的數(shù)據(jù)庫(不同的url)。
兩個不同的schema,一個schema叫bank_icbc,一個schema叫bank_cmb,每個schema中的表結(jié)構(gòu)相同。
我們下面給出相關(guān)的建表語句,每個業(yè)務(wù)表內(nèi)的undo_log請各位看前面篇幅中所介紹的內(nèi)容(內(nèi)含有undo_log表建表語句)。
CREATE?TABLE?`bank_account`?(`account_id`?varchar(32)?COLLATE?utf8_bin?NOT?NULL,`amount`?double(11,2)?DEFAULT?'0.00',`freezed_amount`?double(11,2)?DEFAULT?'0.00',PRIMARY?KEY?(`account_id`) )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8?COLLATE=utf8_bin;工程詳細講解tcc-bank-icbc和tcc-bank-cmb分別連接著這2個schema。而tcc-money-king就是一個consumer,它來模擬你所在的那家第三方支付公司,所有的客戶都是通過tcc-money-king來進行轉(zhuǎn)帳的。
如前面內(nèi)容一樣,我們在講述具體的代碼前先要把tcc如何在SEATA中實現(xiàn)的一些個坑給“填了”。
全局事務(wù)
和SEATA中的AT模式不同TCC的全局事務(wù)不需要你設(shè)置datasourceProxy代理,它只需要把事務(wù)范圍和事務(wù)組申明好就可以了。
我們這邊的事務(wù)組如下所示:
事務(wù)組:demo-tx-grp
事務(wù)邊界:tcc-bank-sample,此處這個邊界就是指的是tcc-money-king項目的dubbo.application.name。
在我們的tcc-money-king中有一個業(yè)務(wù)方法,在這個業(yè)務(wù)方法中只需要如此使用@GlobalTransaction申明即可啟用SEATA的tcc機制。
Spring Boot工程中不可以使用注解來申明dubbo的坑
沒錯!截止發(fā)稿稿為止seata-1.0GA的tcc不支持@Service, @Reference這樣注解方式的dubbo發(fā)布,它雖然不會出錯可是會使得整個tcc事務(wù)失效(AT模式中是完全可以使用注解模式的,TCC模式目前還不支持),只有那些使用普通的spring的.xml配置來申明的provider和reference才能享受tcc的“盛餐”。
那么這對于我們的spring boot工程來說豈不是很“惡心”的一件事?不要急,筆者已經(jīng)探索出來了一條“熊掌與魚兼得”法,即混用springboot和普通spring .xml文件配置。
即,只對dubbo bean進行.xml配置而對其它我們堅持可以使用spring boot的全注解方法來搭建整個項目的框架,見下例。
這邊除了dubbo和一個比較特殊的transactionTemplate需要使用.xml,其它我們照樣可以使用spring boot的全注解配置yyaa,只需要在我們的XxxConfig文件內(nèi)寫上這么一句即可:
@Configuration @ImportResource(locations?=?{?"spring/spring-bean.xml",?"spring/dubbo-bean.xml"?}) public?class?TccBankConfig?{然后在你使用到的地方比如說我們在tcc-money-king中使用了.xml文件配置一個dubbo的引用,那么此時你只需要在你要Reference的Service方法內(nèi)@Autowired一下即可,如下例:
開始講解所有工程中的代碼
整個tcc它圍繞著try, confirm, cancel這3個方法來運作的。這使得你需要使用tcc事務(wù)的話就必須對原有代碼有侵入性。可是SEATA在這方面做的很好,它j通過遠程調(diào)用、AOP來做的全局事務(wù)切入進而實現(xiàn)這一過程的。
所以在seata tcc編程中最最重要的有這么幾個元素:
Global Transactional
Transaction Template
Transaction Manager
下面我們就以實例來感受seata tcc是如何做到盡量少侵入業(yè)務(wù)代碼、又能做到性能最優(yōu)、同時做到數(shù)據(jù)的最終一致性吧。
tcc-bank-icbc
pom.xml
<project?xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.sky.demo</groupId><artifactId>nacos-parent</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>org.sky.demo</groupId><artifactId>tcc-bank-icbc</artifactId><version>0.0.1</version><name>tcc-bank-icbc</name><description>Demo?project?Dubbo+Nacos+SeataTCC</description><properties><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties><dependencies><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId></dependency><dependency><groupId>org.mybatis</groupId><artifactId>mybatis-spring</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-core</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-spring</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--?Dubbo?Registry?Nacos?--><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-registry-nacos</artifactId></dependency><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId></dependency><dependency><groupId>org.sky.demo</groupId><artifactId>skycommon</artifactId><version>${skycommon.version}</version></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId></dependency><dependency><groupId>com.alibaba.boot</groupId><artifactId>nacos-config-spring-boot-starter</artifactId><exclusions><exclusion><artifactId>nacos-client</artifactId><groupId>com.alibaba.nacos</groupId></exclusion></exclusions></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><finalName>${project.artifactId}</finalName><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><jvmArguments>-Dfile.encoding=UTF-8</jvmArguments></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-war-plugin</artifactId><version>2.6</version><configuration><failOnMissingWebXml>false</failOnMissingWebXml></configuration></plugin></plugins><resources><resource><directory>src/main/resources</directory><excludes><exclude>application*.properties</exclude></excludes></resource><resource><directory>src/main/webapp</directory><targetPath>META-INF/resources</targetPath><includes><include>**/**</include></includes></resource><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>application.properties</include></includes></resource></resources></build> </project>application.properties
由于我們對于dubbo需要使用.xml文件的方式配置,因此我們的application.properties文件內(nèi)容相對簡單。
TccBankConfig.java
package?org.sky.tcc.bank.icbc.config;import?org.springframework.boot.context.properties.ConfigurationProperties; import?org.springframework.context.annotation.Bean; import?org.springframework.context.annotation.Configuration; import?org.springframework.context.annotation.ImportResource; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.jdbc.datasource.DataSourceTransactionManager;import?com.alibaba.druid.pool.DruidDataSource;import?io.seata.spring.annotation.GlobalTransactionScanner;@Configuration @ImportResource(locations?=?{?"spring/spring-bean.xml",?"spring/dubbo-bean.xml"?}) public?class?TccBankConfig?{@Bean@ConfigurationProperties(prefix?=?"spring.datasource")public?DruidDataSource?druidDataSource()?{return?new?DruidDataSource();}@Beanpublic?DataSourceTransactionManager?transactionManager(DruidDataSource?druidDataSource)?{return?new?DataSourceTransactionManager(druidDataSource);}@Beanpublic?JdbcTemplate?jdbcTemplate(DruidDataSource?druidDataSource)?{return?new?JdbcTemplate(druidDataSource);}@Beanpublic?GlobalTransactionScanner?globalTransactionScanner()?{return?new?GlobalTransactionScanner("tcc-bank-icbc",?"demo-tx-grp");}}這個就是我們的全局配置類,在這個配置類內(nèi)對于datasource,transaction manager, global transactional我們使用的是全注解。
我們在spring/spring-bean.xml文件內(nèi)申明了transactional template
spring/spring-bean.xml
<?xml?version="1.0"?encoding="UTF-8"?> <!--?~?Copyright?1999-2018?Alibaba?Group?Holding?Ltd.?~?~?Licensed?under?the?Apache?License,?Version?2.0?(the?"License");?~?you?may?not?use?this?file?except?in?compliance?with?the?License.?~?You?may?obtain?a?copy?of?the?License?at?~?~?http://www.apache.org/licenses/LICENSE-2.0?~?~?Unless?required?by?applicable?law?or?agreed?to?in?writing,?software?~?distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,?~?WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.?~?See?the?License?for?the?specific?language?governing?permissions?and?~?limitations?under?the?License.?--> <beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd"default-autowire="byName"><bean?id="transactionTemplate"class="org.springframework.transaction.support.TransactionTemplate"><property?name="propagationBehaviorName"><value>PROPAGATION_REQUIRES_NEW</value></property><property?name="transactionManager"><ref?bean="transactionManager"?/></property></bean> </beans>對于dubbo我們使用的是spring/dubbo-bean.xml來配置的
spring/dubbo-bean.xml
我們可以看到在這個dubbo-bean.xml文件中我們配置了一個核心的org.sky.tcc.bank.icbc.dubbo.MinusMoneyAction,我們先來看這個MinusMoneyAction。
因為我們是從:
工行劃款;
招行打款;
因此我們相應(yīng)的在tcc-bank-cmb中還有一個核心的dubbo叫PlusMoneyAction。
MinusMoneyAction的接口類,注意此接口類為一個“殘根”即“被調(diào)用者”,因此我們把它放置于了skycommon工程內(nèi)了。
MinusMoneyAction.java
package?org.sky.tcc.bank.icbc.dubbo;import?io.seata.rm.tcc.api.BusinessActionContext; import?io.seata.rm.tcc.api.BusinessActionContextParameter; import?io.seata.rm.tcc.api.TwoPhaseBusinessAction;public?interface?MinusMoneyAction?{public?String?sayHello()?throws?RuntimeException;/***?一階段從from帳戶扣錢*?*?@param?businessActionContext*?@param?accountNo*?@param?amount*/@TwoPhaseBusinessAction(name?=?"minusMoneyAction",?commitMethod?=?"commit",?rollbackMethod?=?"rollback")public?boolean?prepareMinus(BusinessActionContext?businessActionContext,@BusinessActionContextParameter(paramName?=?"accountNo")?String?accountNo,@BusinessActionContextParameter(paramName?=?"amount")?double?amount);/***?二階段提交*?*?@param?businessActionContext*?@return*/public?boolean?commit(BusinessActionContext?businessActionContext);/***?二階段回滾*?*?@param?businessActionContext*?@return*/public?boolean?rollback(BusinessActionContext?businessActionContext); }我們可以通過實現(xiàn)類看到它其實是事先了tcc的3個階段:
commit方法對confirm
rollback方法對cancel
prepareMinus方法對try
這3個方法的實現(xiàn)就是讓我們在盡量少破壞業(yè)務(wù)代碼的方法下實現(xiàn)tcc補償式事務(wù)的。這3個方法是相當特殊的,它們的調(diào)用為“被SEATA server端全自動異步回調(diào)”,即不需要你try if xxx catch exception rollback的,你要做的只是告訴業(yè)務(wù)方法在何種狀態(tài)它應(yīng)該要rollback;何種狀態(tài)屬于調(diào)用成功即自動commit。一切都是自動的。
而這邊的commit也不是我們傳統(tǒng)意義的數(shù)據(jù)庫層面的commit。
讓我們來一起看一下它的實現(xiàn)類
MinusMoneyActionImpl.java
package?org.sky.tcc.bank.icbc.dubbo;import?org.sky.service.BaseService; import?org.sky.tcc.bank.icbc.dao.TransferMoneyDAO; import?org.sky.tcc.bean.AccountBean; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.transaction.TransactionStatus; import?org.springframework.transaction.support.TransactionCallback; import?org.springframework.transaction.support.TransactionTemplate;import?io.seata.core.context.RootContext; import?io.seata.rm.tcc.api.BusinessActionContext;public?class?MinusMoneyActionImpl?extends?BaseService?implements?MinusMoneyAction?{/***?扣錢賬戶?DAO*/@Autowiredprivate?TransferMoneyDAO?transferMoneyDAO;/***?扣錢數(shù)據(jù)源事務(wù)模板*/@Autowiredprivate?TransactionTemplate?transactionTemplate;@Overridepublic?String?sayHello()?throws?RuntimeException?{return?"hi?I?am?icbc-dubbo";}@Overridepublic?boolean?prepareMinus(BusinessActionContext?businessActionContext,?String?accountNo,?double?amount)?{logger.info("==========into?prepareMinus");//?分布式事務(wù)IDfinal?String?xid?=?RootContext.getXID();return?transactionTemplate.execute(new?TransactionCallback<Boolean>()?{@Overridepublic?Boolean?doInTransaction(TransactionStatus?status)?{try?{//?校驗賬戶余額AccountBean?account?=?transferMoneyDAO.getAccountForUpdate(accountNo);if?(account?==?null)?{throw?new?RuntimeException("賬戶不存在");}if?(account.getAmount()?-?amount?<?0)?{throw?new?RuntimeException("余額不足");}//?凍結(jié)轉(zhuǎn)賬金額double?freezedAmount?=?account.getFreezedAmount()?+?amount;account.setFreezedAmount(freezedAmount);transferMoneyDAO.updateFreezedAmount(account);logger.info(String.format("======>prepareMinus?account[%s]?amount[%f],?dtx?transaction?id:?%s.",accountNo,?amount,?xid));return?true;}?catch?(Throwable?t)?{logger.error("======>error?occured?in?MinusMoneyActionImpl.prepareMinus:?"?+?t.getMessage(),t.getCause());status.setRollbackOnly();return?false;}}});}@Overridepublic?boolean?commit(BusinessActionContext?businessActionContext)?{logger.info("======>into?MinusMoneyActionImpl.commit()?method");//?分布式事務(wù)IDfinal?String?xid?=?RootContext.getXID();//?賬戶IDfinal?String?accountNo?=?String.valueOf(businessActionContext.getActionContext("accountNo"));//?轉(zhuǎn)出金額final?double?amount?=?Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount")));return?transactionTemplate.execute(new?TransactionCallback<Boolean>()?{@Overridepublic?Boolean?doInTransaction(TransactionStatus?status)?{try?{AccountBean?account?=?transferMoneyDAO.getAccountForUpdate(accountNo);//?扣除賬戶余額double?newAmount?=?account.getAmount()?-?amount;if?(newAmount?<?0)?{throw?new?RuntimeException("余額不足");}account.setAmount(newAmount);//?釋放賬戶?凍結(jié)金額account.setFreezedAmount(account.getFreezedAmount()?-?amount);transferMoneyDAO.updateAmount(account);logger.info(String.format("======>minus?account[%s]?amount[%f],?dtx?transaction?id:?%s.",?accountNo,amount,?xid));return?true;}?catch?(Throwable?t)?{logger.error("======>error?occured?in?MinusMoneyActionImpl.commit:?"?+?t.getMessage(),t.getCause());status.setRollbackOnly();return?false;}}});}@Overridepublic?boolean?rollback(BusinessActionContext?businessActionContext)?{logger.info("======>into?MinusMoneyActionImpl.rollback()?method");//?分布式事務(wù)IDfinal?String?xid?=?RootContext.getXID();//?賬戶IDfinal?String?accountNo?=?String.valueOf(businessActionContext.getActionContext("accountNo"));//?轉(zhuǎn)出金額final?double?amount?=?Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount")));return?transactionTemplate.execute(new?TransactionCallback<Boolean>()?{@Overridepublic?Boolean?doInTransaction(TransactionStatus?status)?{try?{AccountBean?account?=?transferMoneyDAO.getAccountForUpdate(accountNo);if?(account?==?null)?{//?賬戶不存在,回滾什么都不做return?true;}//?釋放凍結(jié)金額if?(account.getFreezedAmount()?>=?amount)?{account.setFreezedAmount(account.getFreezedAmount()?-?amount);transferMoneyDAO.updateFreezedAmount(account);}logger.info(String.format("======>Undo?prepareMinus?account[%s]?amount[%f],?dtx?transaction?id:?%s.",accountNo,?amount,?xid));return?true;}?catch?(Throwable?t)?{logger.error("======>error?occured?in?MinusMoneyActionImpl.rollback:?"?+?t.getMessage(),t.getCause());status.setRollbackOnly();return?false;}}});}}從以上代碼我們可以看到它是一個“工行劃款”的全過程。
一開始它會從prepareMinus方法走起,你在consumer端只需要調(diào)用這個prepareMinus然后后面的commit與rollback是SEATA根據(jù)業(yè)務(wù)方法執(zhí)行的狀態(tài)自動回調(diào)并決定后一步調(diào)用到底是調(diào)用commit還是調(diào)用rollback的,即在consumer端的業(yè)務(wù)方法內(nèi)是不含有commit和rollback的。
此處的PrepareMinus要做的事就是:
先檢查帳戶是否存,如果不存在直接拋出一個RuntimeException迫使全局事務(wù)走后面的rollback分支而不繼續(xù)走數(shù)據(jù)庫的commit也不走該MinusMoneyAction中的commit(第二步);
如果要轉(zhuǎn)帳的數(shù)額大于余額,那肯定也不行的,會拋錯;
檢查好了,它這邊開始做業(yè)務(wù)冪等了,即為了后面的業(yè)務(wù)rollback做準備,它會先把一個“凍結(jié)余額”+“轉(zhuǎn)帳額”。
這就是prepare階段,prepare階段如果成功SEATA會自動走下一步commit,如果遇到有問題就可以運行rollback方法。
那么我們來看業(yè)務(wù)commit(即confirm)方法吧:
經(jīng)過了上述的prepare過程,一切無誤,那么我們就要開始扣款了。為了做到業(yè)務(wù)冪等,在此要再做一次余額校驗,因為spring中的bean都是“非線程安全”的,此時可能由于并發(fā)操作的原因,在過了commit方法后實際數(shù)據(jù)庫內(nèi)的余額因為其它生產(chǎn)上的一些業(yè)務(wù)方法導(dǎo)致了這個余額已經(jīng)低于轉(zhuǎn)帳額了,因此在這里要再做一次校驗,如果校驗不通過那么拋出RuntimeException。
把要轉(zhuǎn)帳帳戶from_account扣去轉(zhuǎn)帳額然后把中間狀態(tài) freezed_amount-轉(zhuǎn)帳額以還原到原有狀態(tài),整個“工行階段的業(yè)務(wù) ”結(jié)束。
很多人在此處要問,為什么需要增加一個freezed_amount,直接扣不就完了。
是!你可以直接扣,可是我們前面說過冪等了,那么請問你在commit或者是在rollback時你會怎么回滾這個數(shù)據(jù)?
我們?nèi)瞬僮鞯脑捑褪窃瓉磙D(zhuǎn)出10元,失敗了,把10元退給原帳戶!
因此我們這邊拿了這個freezed_amount就是來做計算機可以認得的這個“中間暫存”變量。還記得我們在上篇中提到的業(yè)務(wù)冪等嗎?我們需要保存一切中間狀態(tài)以便于“業(yè)務(wù)回退/反交易”。
那么我們下面來看看這個“業(yè)務(wù)回退”是怎么樣的,即rollback方法:
整個劃款過程分為2個步驟,6個小步驟。它們是:minusMoneyAction, plusMoneyAction,每個步驟都有prepare,commit, rollback。
此處的rollback為業(yè)務(wù)rollback,即在6個分解的小步驟中有任何一步拋RuntimeException那么SEATA會自動觸發(fā)兩個大步驟中的rollback。
rollback要做的,拿icbc是扣款來說就是一個“業(yè)務(wù)回退”,它先查詢該帳戶是否存在,如果不存在那也不要做了,帳戶不存在不存在任何拋錯只要return true就可以了什么都不用做。這邊的return true是什么意思尼?這叫空回滾。
所謂空回滾就是事務(wù)協(xié)調(diào)器在調(diào)用TCC服務(wù)的一階段Try操作時,可能會出現(xiàn)因為丟包而導(dǎo)致的網(wǎng)絡(luò)超時,此時事務(wù)協(xié)調(diào)器會觸發(fā)二階段回滾,調(diào)用TCC服務(wù)的Cancel操作;TCC服務(wù)在未收到Try請求的情況下收到Cancel請求,這種場景被稱為空回滾;TCC服務(wù)在實現(xiàn)時應(yīng)當允許空回滾的執(zhí)行;如果你覺得前面這段話有點拗口,那么我們再說了白一點,看下圖就能理解了
從上圖看到,這個rollback以返回true來判定回滾成功,此時你要不給它true給它false或者是Exception的話它就會不斷的嘗試回滾,于是你在后臺會看到一堆的try rollback but failed tryagain...要try多少次呢?它是依賴于SEATA server端的conf/nacos-config.txt中的這么幾個參數(shù)來設(shè)定的。
client.tm.commit.retry.count=1 client.tm.rollback.retry.count=1你現(xiàn)在理解為什么在rollback調(diào)用時如果檢查到了帳戶已經(jīng)不存在,直接返回true而不需要再thru什么Exception或者是return false了吧?再加上你如果前面這2個retry.count參數(shù)沒有設(shè)好,到時你就會限入“無限回滾”(因為默認這兩個值是-1,代表無限嘗試)的狀態(tài),最后把jvm給搞爆掉。
rollback中對于帳戶檢查完后如果沒有問題那么接下來要做的就是把freezed_amount-要轉(zhuǎn)帳額還原到原來的freezed_amount,并把余額還原回操作前的值即可
下面給出DAO的詳細代碼,DAO代碼很簡單,沒什么需要多說的。
TransferMoneyDAO.java
package?org.sky.tcc.bank.icbc.dao;import?org.sky.tcc.bean.AccountBean;public?interface?TransferMoneyDAO?{public?void?addAccount(AccountBean?account)?throws?Exception;public?int?updateAmount(AccountBean?account)?throws?Exception;public?AccountBean?getAccount(String?accountNo)?throws?Exception;public?AccountBean?getAccountForUpdate(String?accountNo)?throws?Exception;public?int?updateFreezedAmount(AccountBean?account)?throws?Exception; }TransferMoneyDAOImpl.java
package?org.sky.tcc.bank.icbc.dao;import?java.sql.ResultSet; import?java.sql.SQLException;import?org.sky.tcc.bean.AccountBean; import?org.sky.tcc.dao.BaseDAO; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.jdbc.core.RowMapper; import?org.springframework.stereotype.Component;@Component public?class?TransferMoneyDAOImpl?extends?BaseDAO?implements?TransferMoneyDAO?{@Autowiredprivate?JdbcTemplate?fromJdbcTemplate;@Overridepublic?void?addAccount(AccountBean?account)?throws?Exception?{String?sql?=?"insert?into?bank_account(account_id,amount,freezed_amount)?values(?,?,?)";fromJdbcTemplate.update(sql,?account.getAccountId(),?account.getAmount(),?account.getFreezedAmount());}@Overridepublic?int?updateAmount(AccountBean?account)?throws?Exception?{String?sql?=?"update?bank_account?set?amount=?,?freezed_amount=??where?account_id=?";int?result?=?0;result?=?fromJdbcTemplate.update(sql,?account.getAmount(),?account.getFreezedAmount(),?account.getAccountId());return?result;}@Overridepublic?AccountBean?getAccount(String?accountNo)?throws?Exception?{String?sql?=?"select?account_id,amount,freezed_amount?from?bank_account?where?account_id=?";AccountBean?account?=?null;//?Object[]?params?=?new?Object[]?{?accountNo?};try?{account?=?fromJdbcTemplate.queryForObject(sql,?new?RowMapper<AccountBean>()?{@Overridepublic?AccountBean?mapRow(ResultSet?rs,?int?rowNum)?throws?SQLException?{AccountBean?account?=?new?AccountBean();account.setAccountId(rs.getString("account_id"));account.setAmount(rs.getDouble("amount"));account.setFreezedAmount(rs.getDouble("freezed_amount"));return?account;}},?accountNo);}?catch?(Exception?e)?{logger.error("getAccount?error:?"?+?e.getMessage(),?e);account?=?null;}return?account;}@Overridepublic?AccountBean?getAccountForUpdate(String?accountNo)?throws?Exception?{String?sql?=?"select?account_id,amount,freezed_amount?from?bank_account?where?account_id=??for?update";AccountBean?account?=?null;//?Object[]?params?=?new?Object[]?{?accountNo?};try?{account?=?fromJdbcTemplate.queryForObject(sql,?new?RowMapper<AccountBean>()?{@Overridepublic?AccountBean?mapRow(ResultSet?rs,?int?rowNum)?throws?SQLException?{AccountBean?account?=?new?AccountBean();account.setAccountId(rs.getString("account_id"));account.setAmount(rs.getDouble("amount"));account.setFreezedAmount(rs.getDouble("freezed_amount"));return?account;}},?accountNo);}?catch?(Exception?e)?{logger.error("getAccount?error:?"?+?e.getMessage(),?e);return?null;}return?account;}@Overridepublic?int?updateFreezedAmount(AccountBean?account)?throws?Exception?{String?sql?=?"update?bank_account?set?freezed_amount=??where?account_id=?";int?result?=?0;result?=?fromJdbcTemplate.update(sql,?account.getFreezedAmount(),?account.getAccountId());return?result;}}用于啟用動的ICBCApplication。
此處因為我們用了.xml模式配置dubbo,因此可就不能使用@EnableDubbo了啊!
tcc-bank-cmb工程
這是一個“招行打款”的dubbo provider,它和前面的工行扣款類似,也是實現(xiàn)了TCC的提交方式,只不過它要做的是“增加余額操作”。
其它邏輯和tcc-bank-icbc一樣,我們在此看一下它的三個TCC吧。
PlusMoneyActionImpl.java
package?org.sky.tcc.bank.cmb.dubbo;import?org.sky.service.BaseService; import?org.sky.tcc.bank.cmb.dao.TransferMoneyDAO; import?org.sky.tcc.bean.AccountBean; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.transaction.TransactionStatus; import?org.springframework.transaction.support.TransactionCallback; import?org.springframework.transaction.support.TransactionTemplate;import?io.seata.core.context.RootContext; import?io.seata.rm.tcc.api.BusinessActionContext;public?class?PlusMoneyActionImpl?extends?BaseService?implements?PlusMoneyAction?{@Autowiredprivate?TransactionTemplate?transactionTemplate;@Autowiredprivate?TransferMoneyDAO?transferMoneyDAO;@Overridepublic?String?sayHello()?throws?RuntimeException?{return?"hi?I?am?cmb-dubbo";}@Overridepublic?boolean?prepareAdd(BusinessActionContext?businessActionContext,?String?accountNo,?double?amount)?{logger.info("======>inti?prepare?add");final?String?xid?=?RootContext.getXID();return?transactionTemplate.execute(new?TransactionCallback<Boolean>()?{@Overridepublic?Boolean?doInTransaction(TransactionStatus?status)?{try?{//?校驗賬戶AccountBean?account?=?transferMoneyDAO.getAccountForUpdate(accountNo);if?(account?==?null)?{logger.info("======>prepareAdd:?賬戶["?+?accountNo?+?"]不存在,?txId:"?+?businessActionContext.getXid());return?false;}//?待轉(zhuǎn)入資金作為?不可用金額double?freezedAmount?=?account.getFreezedAmount()?+?amount;account.setFreezedAmount(freezedAmount);transferMoneyDAO.updateFreezedAmount(account);logger.info(String.format("PlusMoneyActionImpl.prepareAdd?account[%s]?amount[%f],?dtx?transaction?id:?%s.",?accountNo,amount,?xid));return?true;}?catch?(Throwable?t)?{logger.error("======>error?occured?in?PlusMoneyActionImpl.prepareAdd:?"?+?t.getMessage(),t.getCause());status.setRollbackOnly();return?false;}}});}@Overridepublic?boolean?commit(BusinessActionContext?businessActionContext)?{logger.info("======>into?PlusMoneyActionImpl.commit()?method");//?分布式事務(wù)IDfinal?String?xid?=?RootContext.getXID();//?賬戶IDfinal?String?accountNo?=?String.valueOf(businessActionContext.getActionContext("accountNo"));//?轉(zhuǎn)出金額final?double?amount?=?Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount")));return?transactionTemplate.execute(new?TransactionCallback<Boolean>()?{@Overridepublic?Boolean?doInTransaction(TransactionStatus?status)?{try?{AccountBean?account?=?transferMoneyDAO.getAccountForUpdate(accountNo);//?加錢double?newAmount?=?account.getAmount()?+?amount;account.setAmount(newAmount);//?凍結(jié)金額?清除account.setFreezedAmount(account.getFreezedAmount()?-?amount);transferMoneyDAO.updateAmount(account);logger.info(String.format("======>add?account[%s]?amount[%f],?dtx?transaction?id:?%s.",?accountNo,amount,?xid));return?true;}?catch?(Throwable?t)?{logger.error("======>error?occured?in?PlusMoneyActionImpl.commit:?"?+?t.getMessage(),?t.getCause());status.setRollbackOnly();return?false;}}});}@Overridepublic?boolean?rollback(BusinessActionContext?businessActionContext)?{logger.info("======>into?PlusMoneyActionImpl.rollback()?method");//?分布式事務(wù)IDfinal?String?xid?=?RootContext.getXID();//?賬戶IDfinal?String?accountNo?=?String.valueOf(businessActionContext.getActionContext("accountNo"));//?轉(zhuǎn)出金額final?double?amount?=?Double.valueOf(String.valueOf(businessActionContext.getActionContext("amount")));return?transactionTemplate.execute(new?TransactionCallback<Boolean>()?{@Overridepublic?Boolean?doInTransaction(TransactionStatus?status)?{try?{AccountBean?account?=?transferMoneyDAO.getAccountForUpdate(accountNo);if?(account?==?null)?{//?賬戶不存在,?無需回滾動作return?true;}//?凍結(jié)金額?清除if?(account.getFreezedAmount()?>=?amount)?{account.setFreezedAmount(account.getFreezedAmount()?-?amount);transferMoneyDAO.updateFreezedAmount(account);}logger.info(String.format("======>Undo?account[%s]?amount[%f],?dtx?transaction?id:?%s.",?accountNo,amount,?xid));return?true;}?catch?(Throwable?t)?{logger.error("======>error?occured?in?PlusMoneyActionImpl.rollback:?"?+?t.getMessage(),t.getCause());status.setRollbackOnly();return?false;}}});}}prepareAdd階段階段,檢查帳戶如果有異常拋出RuntimeEcxeption讓SEATA觸發(fā)回滾(業(yè)務(wù)回滾+事務(wù)回滾)。如果無誤那么把中間狀態(tài) freezed_amount+轉(zhuǎn)入額。
commit階段,帳戶余額+轉(zhuǎn)入金融,接著把freezed_amount-轉(zhuǎn)入額度還原到原來的值。
rollback階段,和minusMoneyAction邏輯一樣,把凍結(jié)款-轉(zhuǎn)帳額再把余額回退到操作前的值。
pom.xml
<project?xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.sky.demo</groupId><artifactId>nacos-parent</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>org.sky.demo</groupId><artifactId>tcc-bank-cmb</artifactId><version>0.0.1</version><name>tcc-bank-cmb</name><description>Demo?project?Dubbo+Nacos+SeataTCC</description><properties><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding></properties><dependencies><dependency><groupId>org.mybatis</groupId><artifactId>mybatis</artifactId></dependency><dependency><groupId>org.mybatis</groupId><artifactId>mybatis-spring</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-core</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-spring</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--?Dubbo?Registry?Nacos?--><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-registry-nacos</artifactId></dependency><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId></dependency><dependency><groupId>org.sky.demo</groupId><artifactId>skycommon</artifactId><version>${skycommon.version}</version></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId></dependency><dependency><groupId>com.alibaba.boot</groupId><artifactId>nacos-config-spring-boot-starter</artifactId><exclusions><exclusion><artifactId>nacos-client</artifactId><groupId>com.alibaba.nacos</groupId></exclusion></exclusions></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies><build><finalName>${project.artifactId}</finalName><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><jvmArguments>-Dfile.encoding=UTF-8</jvmArguments></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-war-plugin</artifactId><version>2.6</version><configuration><failOnMissingWebXml>false</failOnMissingWebXml></configuration></plugin></plugins><resources><resource><directory>src/main/resources</directory><excludes><exclude>application*.properties</exclude></excludes></resource><resource><directory>src/main/webapp</directory><targetPath>META-INF/resources</targetPath><includes><include>**/**</include></includes></resource><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>application.properties</include></includes></resource></resources></build> </project>application.properties
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource spring.datasource.driverClassName=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://192.168.56.101:3306/bank_cmb?useUnicode=true&characterEncoding=utf-8&useSSL=false spring.datasource.username=cmb spring.datasource.password=111111 spring.datasource.initialize=false spring.datasource.initialSize=5 spring.datasource.minIdle=5 spring.datasource.maxActive:?20 spring.datasource.maxWait:?30000 spring.datasource.validationQuery=SELECT?1?FROM?DUAL spring.datasource.testWhileIdle=true spring.datasource.testOnBorrow=false spring.datasource.testOnReturn=false spring.datasource.poolPreparedStatements=true spring.datasource.maxPoolPreparedStatementPerConnectionSize=128 logging.config=classpath:log4j2.xmlspring/dubbo-bean.xml
<?xml?version="1.0"?encoding="UTF-8"?> <!--?~?Copyright?1999-2018?Alibaba?Group?Holding?Ltd.?~?~?Licensed?under?the?Apache?License,?Version?2.0?(the?"License");?~?you?may?not?use?this?file?except?in?compliance?with?the?License.?~?You?may?obtain?a?copy?of?the?License?at?~?~?http://www.apache.org/licenses/LICENSE-2.0?~?~?Unless?required?by?applicable?law?or?agreed?to?in?writing,?software?~?distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,?~?WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.?~?See?the?License?for?the?specific?language?governing?permissions?and?~?limitations?under?the?License.?--> <beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd"default-autowire="byName"><dubbo:application?name="tcc-bank-cmb"?/><!--使用?zookeeper?注冊中心暴露服務(wù),注意要先開啟?zookeeper?--><dubbo:registry?address="nacos://192.168.56.101:8848"?/><!--<transfer:registry?address="multicast://224.5.6.7:1234?unicast=false"?/>?--><dubbo:protocol?name="dubbo"?port="29990"?/><dubbo:provider?timeout="30000"?threads="10"threadpool="fixed"?/><!--?第一個TCC?參與者服務(wù)發(fā)布?--><dubbo:serviceinterface="org.sky.tcc.bank.cmb.dubbo.PlusMoneyAction"timeout="30000"?ref="plusMoneyActionImpl"?/><bean?name="plusMoneyActionImpl"class="org.sky.tcc.bank.cmb.dubbo.PlusMoneyActionImpl"?/> </beans>spring/spirng-bean.xml
<?xml?version="1.0"?encoding="UTF-8"?> <!--?~?Copyright?1999-2018?Alibaba?Group?Holding?Ltd.?~?~?Licensed?under?the?Apache?License,?Version?2.0?(the?"License");?~?you?may?not?use?this?file?except?in?compliance?with?the?License.?~?You?may?obtain?a?copy?of?the?License?at?~?~?http://www.apache.org/licenses/LICENSE-2.0?~?~?Unless?required?by?applicable?law?or?agreed?to?in?writing,?software?~?distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,?~?WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.?~?See?the?License?for?the?specific?language?governing?permissions?and?~?limitations?under?the?License.?--> <beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd"default-autowire="byName"><bean?id="transactionTemplate"class="org.springframework.transaction.support.TransactionTemplate"><property?name="propagationBehaviorName"><value>PROPAGATION_REQUIRES_NEW</value></property><property?name="transactionManager"><ref?bean="transactionManager"?/></property></bean> </beans>自動裝配用TccBankConfig.java,這邊要注意的是此處的GlobalTransaction里的名字可必須是tcc-bank-cmb啦,不要復(fù)制粘貼后忘改了。
TccBankConfig.java
package?org.sky.tcc.bank.cmb.config;import?org.springframework.boot.context.properties.ConfigurationProperties; import?org.springframework.context.annotation.Bean; import?org.springframework.context.annotation.Configuration; import?org.springframework.context.annotation.ImportResource; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.jdbc.datasource.DataSourceTransactionManager;import?com.alibaba.druid.pool.DruidDataSource;import?io.seata.spring.annotation.GlobalTransactionScanner;@Configuration @ImportResource(locations?=?{?"spring/spring-bean.xml",?"spring/dubbo-bean.xml"?}) public?class?TccBankConfig?{@Bean@ConfigurationProperties(prefix?=?"spring.datasource")public?DruidDataSource?druidDataSource()?{return?new?DruidDataSource();}@Beanpublic?DataSourceTransactionManager?transactionManager(DruidDataSource?druidDataSource)?{return?new?DataSourceTransactionManager(druidDataSource);}@Beanpublic?JdbcTemplate?jdbcTemplate(DruidDataSource?druidDataSource)?{return?new?JdbcTemplate(druidDataSource);}@Beanpublic?GlobalTransactionScanner?globalTransactionScanner()?{return?new?GlobalTransactionScanner("tcc-bank-cmb",?"demo-tx-grp");}}TransferMoneyDAO.java
package?org.sky.tcc.bank.cmb.dao;import?org.sky.tcc.bean.AccountBean;public?interface?TransferMoneyDAO?{public?void?addAccount(AccountBean?account)?throws?Exception;public?int?updateAmount(AccountBean?account)?throws?Exception;public?AccountBean?getAccount(String?accountNo)?throws?Exception;public?AccountBean?getAccountForUpdate(String?accountNo)?throws?Exception;public?int?updateFreezedAmount(AccountBean?account)?throws?Exception; }TransferMoneyDAOImpl.java
package?org.sky.tcc.bank.cmb.dao;import?java.sql.ResultSet; import?java.sql.SQLException; import?org.sky.tcc.bean.AccountBean; import?org.sky.tcc.dao.BaseDAO; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.jdbc.core.JdbcTemplate; import?org.springframework.jdbc.core.RowMapper; import?org.springframework.stereotype.Component;@Component public?class?TransferMoneyDAOImpl?extends?BaseDAO?implements?TransferMoneyDAO?{@Autowiredprivate?JdbcTemplate?toJdbcTemplate;@Overridepublic?void?addAccount(AccountBean?account)?throws?Exception?{String?sql?=?"insert?into?bank_account(account_id,amount,freezed_amount)?values(?,?,?)";toJdbcTemplate.update(sql,?account.getAccountId(),?account.getAmount(),?account.getFreezedAmount());}@Overridepublic?int?updateAmount(AccountBean?account)?throws?Exception?{String?sql?=?"update?bank_account?set?amount=?,?freezed_amount=??where?account_id=?";int?result?=?0;result?=?toJdbcTemplate.update(sql,?account.getAmount(),?account.getFreezedAmount(),?account.getAccountId());return?result;}@Overridepublic?AccountBean?getAccount(String?accountNo)?throws?Exception?{String?sql?=?"select?account_id,amount,freezed_amount?from?bank_account?where?account_id=?";AccountBean?account?=?null;//?Object[]?params?=?new?Object[]?{?accountNo?};try?{account?=?toJdbcTemplate.queryForObject(sql,?new?RowMapper<AccountBean>()?{@Overridepublic?AccountBean?mapRow(ResultSet?rs,?int?rowNum)?throws?SQLException?{AccountBean?account?=?new?AccountBean();account.setAccountId(rs.getString("account_id"));account.setAmount(rs.getDouble("amount"));account.setFreezedAmount(rs.getDouble("freezed_amount"));return?account;}},?accountNo);}?catch?(Exception?e)?{logger.error("getAccount?error:?"?+?e.getMessage(),?e);account?=?null;}return?account;}@Overridepublic?AccountBean?getAccountForUpdate(String?accountNo)?throws?Exception?{String?sql?=?"select?account_id,amount,freezed_amount?from?bank_account?where?account_id=??for?update";AccountBean?account?=?null;//?Object[]?params?=?new?Object[]?{?accountNo?};try?{account?=?toJdbcTemplate.queryForObject(sql,?new?RowMapper<AccountBean>()?{@Overridepublic?AccountBean?mapRow(ResultSet?rs,?int?rowNum)?throws?SQLException?{AccountBean?account?=?new?AccountBean();account.setAccountId(rs.getString("account_id"));account.setAmount(rs.getDouble("amount"));account.setFreezedAmount(rs.getDouble("freezed_amount"));return?account;}},?accountNo);}?catch?(Exception?e)?{logger.error("getAccount?error:?"?+?e.getMessage(),?e);account?=?null;}return?account;}@Overridepublic?int?updateFreezedAmount(AccountBean?account)?throws?Exception?{String?sql?=?"update?bank_account?set?freezed_amount=??where?account_id=?";int?result?=?0;result?=?toJdbcTemplate.update(sql,?account.getFreezedAmount(),?account.getAccountId());return?result;}}用于啟用動的CMBApplication,此處因為我們用了.xml模式配置dubbo,因此可就不能使用@EnableDubbo了哦再次提醒一次。
CMBApplication.java
package?org.sky.tcc.bank.cmb;import?org.springframework.boot.SpringApplication; import?org.springframework.boot.autoconfigure.EnableAutoConfiguration; import?org.springframework.context.annotation.ComponentScan;@EnableAutoConfiguration @ComponentScan(basePackages?=?{?"org.sky.tcc.bank"?}) public?class?CMBApplication?{public?static?void?main(String[]?args)?{SpringApplication.run(CMBApplication.class,?args);}}到此為止兩個dubbo provider制作完成,我們把它們分別運行起來。
啟動之前我放出此次在生產(chǎn)環(huán)境調(diào)整過的nacos-config.txt文件,你只要在nacos服務(wù)啟動的情況下重新在seata/conf下就可以了。?
./nacos-config.sh?localhostseata/conf/nacos-config.txt
transport.type=TCP transport.server=NIO transport.heartbeat=true transport.thread-factory.boss-thread-prefix=NettyBoss transport.thread-factory.worker-thread-prefix=NettyServerNIOWorker transport.thread-factory.server-executor-thread-prefix=NettyServerBizHandler transport.thread-factory.share-boss-worker=false transport.thread-factory.client-selector-thread-prefix=NettyClientSelector transport.thread-factory.client-selector-thread-size=1 transport.thread-factory.client-worker-thread-prefix=NettyClientWorkerThread transport.thread-factory.boss-thread-size=1 transport.thread-factory.worker-thread-size=8 transport.shutdown.wait=3 service.vgroup_mapping.demo-tx-grp=default service.default.grouplist=192.168.56.101:8091 service.enableDegrade=false service.disable=false service.max.commit.retry.timeout=10000 service.max.rollback.retry.timeout=3 client.async.commit.buffer.limit=10000 client.lock.retry.internal=3 client.lock.retry.times=3 client.lock.retry.policy.branch-rollback-on-conflict=true client.table.meta.check.enable=true client.report.retry.count=1 client.tm.commit.retry.count=1 client.tm.rollback.retry.count=1 store.mode=db store.file.dir=file_store/data store.file.max-branch-session-size=16384 store.file.max-global-session-size=512 store.file.file-write-buffer-cache-size=16384 store.file.flush-disk-mode=async store.file.session.reload.read_size=100 store.db.datasource=druid store.db.db-type=mysql store.db.driver-class-name=com.mysql.jdbc.Driver store.db.url=jdbc:mysql://192.168.56.101:3306/seata?useUnicode=true store.db.user=seata store.db.password=111111 store.db.min-conn=1 store.db.max-conn=3 store.db.global.table=global_table store.db.branch.table=branch_table store.db.query-limit=100 store.db.lock-table=lock_table recovery.committing-retry-period=1000 recovery.asyn-committing-retry-period=1000 recovery.rollbacking-retry-period=1000 recovery.timeout-retry-period=1000 transaction.undo.data.validation=true transaction.undo.log.serialization=jackson transaction.undo.log.save.days=1 transaction.undo.log.delete.period=86400000 transaction.undo.log.table=undo_log transport.serialization=seata transport.compressor=none metrics.enabled=false metrics.registry-type=compact metrics.exporter-list=prometheus metrics.exporter-prometheus-port=9898 support.spring.datasource.autoproxy=falsetcc-bank-icbc運行在28880端口;
tcc-bank-cmb運行在29990端口;
tcc-money-king工程
為了全真模擬生產(chǎn),我們制作了一個spring boot的consumer,在這個工程里我們依然使用springboot+xml配置混合的方式,關(guān)鍵在該工程的業(yè)務(wù)方法內(nèi),我們看下去。
pom.xml
<project?xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.sky.demo</groupId><artifactId>nacos-parent</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>org.sky.demo</groupId><artifactId>tcc-money-king</artifactId><version>0.0.1</version><packaging>war</packaging><description>Demo?project?Dubbo+Nacos+SeataTCC?Consumer</description><dependencies><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-core</artifactId><scope>test</scope></dependency><dependency><groupId>org.spockframework</groupId><artifactId>spock-spring</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-tomcat</artifactId><scope>compile</scope></dependency><dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><!--?Dubbo?Registry?Nacos?--><dependency><groupId>org.apache.dubbo</groupId><artifactId>dubbo-registry-nacos</artifactId></dependency><dependency><groupId>com.alibaba.nacos</groupId><artifactId>nacos-client</artifactId></dependency><dependency><groupId>org.sky.demo</groupId><artifactId>skycommon</artifactId><version>${skycommon.version}</version></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId></dependency><dependency><groupId>com.alibaba.boot</groupId><artifactId>nacos-config-spring-boot-starter</artifactId><exclusions><exclusion><artifactId>nacos-client</artifactId><groupId>com.alibaba.nacos</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><version>${javax.servlet.version}</version><scope>provided</scope></dependency></dependencies><build><sourceDirectory>src/main/java</sourceDirectory><testSourceDirectory>src/test/java</testSourceDirectory><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/webapp</directory><targetPath>META-INF/resources</targetPath><includes><include>**/**</include></includes></resource><resource><directory>src/main/resources</directory><filtering>true</filtering><includes><include>application.properties</include><include>application-${profileActive}.properties</include></includes></resource></resources></build> </project>application.properties
spring/dubbo-reference.xml
<?xml?version="1.0"?encoding="UTF-8"?> <!--?~?Copyright?1999-2018?Alibaba?Group?Holding?Ltd.?~?~?Licensed?under?the?Apache?License,?Version?2.0?(the?"License");?~?you?may?not?use?this?file?except?in?compliance?with?the?License.?~?You?may?obtain?a?copy?of?the?License?at?~?~?http://www.apache.org/licenses/LICENSE-2.0?~?~?Unless?required?by?applicable?law?or?agreed?to?in?writing,?software?~?distributed?under?the?License?is?distributed?on?an?"AS?IS"?BASIS,?~?WITHOUT?WARRANTIES?OR?CONDITIONS?OF?ANY?KIND,?either?express?or?implied.?~?See?the?License?for?the?specific?language?governing?permissions?and?~?limitations?under?the?License.?--> <beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://code.alibabatech.com/schema/dubbohttp://code.alibabatech.com/schema/dubbo/dubbo.xsd"default-autowire="byName"><dubbo:application?name="tcc-bank-sample"><dubbo:parameter?key="qos.enable"?value="false"?/></dubbo:application><!--使用?zookeeper?注冊中心暴露服務(wù),注意要先開啟?zookeeper?--><dubbo:registry?address="nacos://192.168.56.101:8848"?/><!--<transfer:registry?address="multicast://224.5.6.7:1234?unicast=false"?/>?--><!--?第一個TCC參與者?服務(wù)訂閱?--><dubbo:reference?id="minusMoneyAction"interface="org.sky.tcc.bank.icbc.dubbo.MinusMoneyAction"check="false"?lazy="true"?/><!--?第二個TCC參與者?服務(wù)訂閱?--><dubbo:reference?id="plusMoneyAction"interface="org.sky.tcc.bank.cmb.dubbo.PlusMoneyAction"check="false"?lazy="true"?/></beans>spring boot自動注解用SeataAutoConfig.java
package?org.sky.tcc.moneyking.config;import?io.seata.spring.annotation.GlobalTransactionScanner;import?org.springframework.context.annotation.Bean; import?org.springframework.context.annotation.Configuration; import?org.springframework.context.annotation.ImportResource;@Configuration @ImportResource(locations?=?{?"spring/dubbo-reference.xml"?}) public?class?SeataAutoConfig?{@Beanpublic?GlobalTransactionScanner?globalTransactionScanner()?{return?new?GlobalTransactionScanner("tcc-bank-sample",?"demo-tx-grp");} }這邊的GlobalTRansactionScanner里的第一個參數(shù)可就是事務(wù)邊界了啊,注意這邊的事務(wù)group必須和SEATA端的nacos-config.txt內(nèi)配置的完全一致。
TccMoneyKingBizService.java
package?org.sky.tcc.moneyking.service.biz;import?org.sky.exception.DemoRpcRunTimeException;public?interface?TccMoneyKingBizService?{public?boolean?transfer(String?from,?String?to,?double?amount)?throws?DemoRpcRunTimeException; }核心業(yè)務(wù)方法TccMoneyKingBizServiceImpl.java
package?org.sky.tcc.moneyking.service.biz;import?org.sky.exception.DemoRpcRunTimeException; import?org.sky.service.BaseService; import?org.sky.tcc.bank.cmb.dubbo.PlusMoneyAction; import?org.sky.tcc.bank.icbc.dubbo.MinusMoneyAction; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.stereotype.Service;import?io.seata.spring.annotation.GlobalTransactional;@Service public?class?TccMoneyKingBizServiceImpl?extends?BaseService?implements?TccMoneyKingBizService?{@Autowiredprivate?MinusMoneyAction?minusMoneyAction;@Autowiredprivate?PlusMoneyAction?plusMoneyAction;@Override@GlobalTransactional(timeoutMills?=?300000,?name?=?"tcc-bank-sample")public?boolean?transfer(String?from,?String?to,?double?amount)?throws?DemoRpcRunTimeException?{boolean?answer?=?minusMoneyAction.prepareMinus(null,?from,?amount);if?(!answer)?{//?扣錢參與者,一階段失敗;?回滾本地事務(wù)和分布式事務(wù)throw?new?DemoRpcRunTimeException("賬號:["?+?from?+?"]?預(yù)扣款失敗");}//?加錢參與者,一階段執(zhí)行answer?=?plusMoneyAction.prepareAdd(null,?to,?amount);if?(!answer)?{throw?new?DemoRpcRunTimeException("賬號:["?+?to?+?"]?預(yù)收款失敗");}return?true;}}這邊可以看到是如何調(diào)用icbc的扣款和cmb的打款動作的,這邊根本不需要你再去寫什么commit和rollback,只要這兩個dubbo provider中的prepare方法執(zhí)行正常,SEATA就會自動回調(diào)icbc和cmb中的commit方法;只要icbc或者是cmb中有任何一步拋錯,就會觸發(fā)這兩個provider中的業(yè)務(wù)回滾rollback方法。
MonekyKingController.java
package?org.sky.tcc.moneyking.controller;import?java.util.HashMap; import?java.util.Map;import?org.sky.controller.BaseController; import?org.sky.tcc.bean.AccountBean; import?org.sky.tcc.moneyking.service.biz.TccMoneyKingBizService; import?org.springframework.beans.factory.annotation.Autowired; import?org.springframework.http.HttpHeaders; import?org.springframework.http.HttpStatus; import?org.springframework.http.MediaType; import?org.springframework.http.ResponseEntity; import?org.springframework.web.bind.annotation.PostMapping; import?org.springframework.web.bind.annotation.RequestBody; import?org.springframework.web.bind.annotation.RequestMapping; import?org.springframework.web.bind.annotation.RestController;import?com.alibaba.fastjson.JSON; import?com.alibaba.fastjson.JSONObject;@RestController @RequestMapping("moneyking") public?class?MonekyKingController?extends?BaseController?{@Autowiredprivate?TccMoneyKingBizService?tccMoneyKingBizService;@PostMapping(value?=?"/transfermoney",?produces?=?"application/json")public?ResponseEntity<String>?transferMoney(@RequestBody?String?params)?throws?Exception?{ResponseEntity<String>?response?=?null;String?returnResultStr;HttpHeaders?headers?=?new?HttpHeaders();headers.setContentType(MediaType.APPLICATION_JSON_UTF8);Map<String,?Object>?result?=?new?HashMap<>();try?{logger.info("input?params====="?+?params);JSONObject?requestJsonObj?=?JSON.parseObject(params);Map<String,?AccountBean>?acctMap?=?getAccountFromJson(requestJsonObj);AccountBean?acctFrom?=?acctMap.get("account_from");AccountBean?acctTo?=?acctMap.get("account_to");boolean?answer?=?tccMoneyKingBizService.transfer(acctFrom.getAccountId(),?acctTo.getAccountId(),acctFrom.getAmount()); //????????????tccMoneyKingBizService.icbcHello(); //????????????tccMoneyKingBizService.cmbHello();result.put("account_from",?acctFrom.getAccountId());result.put("account_to",?acctTo.getAccountId());result.put("transfer_money",?acctFrom.getAmount());result.put("message",?"transferred?successfully");returnResultStr?=?JSON.toJSONString(result);logger.info("transfer?money?successfully======>\n"?+?returnResultStr);response?=?new?ResponseEntity<>(returnResultStr,?headers,?HttpStatus.OK);}?catch?(Exception?e)?{logger.error("transfer?money?with?error:?"?+?e.getMessage(),?e);result.put("message",?"transfer?money?with?error[?"?+?e.getMessage()?+?"]");returnResultStr?=?JSON.toJSONString(result);response?=?new?ResponseEntity<>(returnResultStr,?headers,?HttpStatus.EXPECTATION_FAILED);}return?response;} }用于啟動的MoneyKingApplication.java
package?org.sky.tcc.moneyking;import?org.apache.dubbo.config.spring.context.annotation.EnableDubbo; import?org.springframework.boot.SpringApplication; import?org.springframework.boot.autoconfigure.EnableAutoConfiguration; import?org.springframework.boot.web.servlet.ServletComponentScan; import?org.springframework.context.annotation.ComponentScan; import?org.springframework.transaction.annotation.EnableTransactionManagement;@ServletComponentScan @EnableAutoConfiguration @ComponentScan(basePackages?=?{?"org.sky"?})public?class?MoneyKingApplication?{public?static?void?main(String[]?args)?{SpringApplication.run(MoneyKingApplication.class,?args);}}把MoneyKingApplication啟動起來。
看,兩個dubbo provider已經(jīng)被SEATA納入托管。
測試案例
我們初始化兩個帳戶,一個叫a一個叫b。然后通過a給b每次打100塊錢。
use?bank_icbc; delete?from?bank_account; insert?into?bank_account (account_id,amount,freezed_amount)values('a',50000,0); use?bank_cmb; delete?from?bank_account; insert?into?bank_account (account_id,amount,freezed_amount)values('b',100,0);正常劃款?
請觀察icbc和cmb的后臺,從prepare為人為觸發(fā)外,commit的一系列的動作都是被自動觸發(fā)的。
再看數(shù)據(jù)庫端
非正常劃款
轉(zhuǎn)帳額大于余額
{"account_from"?:?"a","account_to"?:?"b","transfer_money"?:?1000000000 }來看icbc和cmb端的回滾
?看到?jīng)],rollback被自動觸發(fā)。數(shù)據(jù)庫端當然也沒被插進數(shù)據(jù)(被回滾掉了)。
帳戶不存在
{"account_from"?:?"a","account_to"?:?"c","transfer_money"?:?100 }總結(jié)
我們可以通過上述的例子看到,SEATA把分布式事務(wù)的鎖可以定義為最最小業(yè)務(wù)原子操作,這使得本來冗長的事務(wù)鎖的開銷可以盡量的小,盡快的釋放原子操作從而加速了分布式事務(wù)處理的效率。
SEATA通過數(shù)據(jù)一致性、盡可能少破壞業(yè)務(wù)代碼、高性能這三者關(guān)系中進行了一個取舍,它付的代價就是使用netty通訊實現(xiàn)了異步消息回調(diào)+spring aop,這個對服務(wù)器的硬件要求很高。當服務(wù)器的硬件如果跟不上的話,你會發(fā)現(xiàn)部署一個SEATA簡直是要了你的老命了,很多網(wǎng)上的網(wǎng)友也都說過,我部署了一個SEATA比原來竟然慢了8倍。這倒不是說這個框架不好,只是它的開銷會比較大。當然,在現(xiàn)今硬件越來越廉價的情況下,要保證數(shù)據(jù)的最終一致完整性,總要有適當?shù)母冻龅摹?/p>
福利掃描添加小編微信,備注“姓名+公司職位”,入駐【CSDN博客】,加入【云計算學(xué)習(xí)交流群】,和志同道合的朋友們共同打卡學(xué)習(xí)! 推薦閱讀:微服務(wù)架構(gòu)何去何從? 疫情肆虐下,程序員用代碼告訴你為什么千萬不要出門! 終于!孫宇晨和巴菲特吃上 3153 萬元的晚餐,還送了一個比特幣! 2020年AI如何走?Jeff Dean和其他四位“大神”已做預(yù)測! 為什么說 Julia 更優(yōu)于 Python? AI 醫(yī)生“戰(zhàn)疫”在前線 真香,朕在看了!總結(jié)
以上是生活随笔為你收集整理的GitHub 标星 14000+,阿里开源的 SEATA 如何应用到极致?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 12306 的技术扒光在你面前,100
- 下一篇: 终于,我也到了和 Eclipse 说再见