深度剖析Apache Shardingsphere对分布式事务的支持
Apache ShardingSphere 是一套開(kāi)源的分布式數(shù)據(jù)庫(kù)中間件解決方案組成的生態(tài)圈,它由 JDBC、Proxy 和 Sidecar(規(guī)劃中)這 3 款相互獨(dú)立,卻又能夠混合部署配合使用的產(chǎn)品組成。它們均提供標(biāo)準(zhǔn)化的數(shù)據(jù)分片、分布式事務(wù)和數(shù)據(jù)庫(kù)治理功能,可適用于如 Java 同構(gòu)、異構(gòu)語(yǔ)言、云原生等各種多樣化的應(yīng)用場(chǎng)景。ShardingSphere 已于2020年4月16日成為 Apache 軟件基金會(huì)的頂級(jí)項(xiàng)目。
分布式系統(tǒng)CAP理論
一致性(Consistency)
一致性指 all nodes see the same data at the same time,即更新操作成功并返回客戶(hù)端完成后,所有節(jié)點(diǎn)在同一時(shí)間的數(shù)據(jù)完全一致,不能存在中間狀態(tài)。
關(guān)于一致性,如果用戶(hù)時(shí)刻看到的數(shù)據(jù)都是一致的,那么稱(chēng)之為強(qiáng)一致性。如果允許存在中間狀態(tài),只要求經(jīng)過(guò)一段時(shí)間后,數(shù)據(jù)最終是一致的,則稱(chēng)之為最終一致性。此外,如果允許存在部分?jǐn)?shù)據(jù)不一致,那么就稱(chēng)之為弱一致性
可用性(Availability)
可用性是指系統(tǒng)提供的服務(wù)必須一直處于可用的狀態(tài),對(duì)于用戶(hù)的每一個(gè)操作請(qǐng)求總是能夠在有限的時(shí)間內(nèi)返回結(jié)果。有限的時(shí)間內(nèi)是指:對(duì)于用戶(hù)的一個(gè)操作請(qǐng)求,系統(tǒng)必須能夠在指定的時(shí)間內(nèi)返回對(duì)應(yīng)的處理結(jié)果,如果超過(guò)了這個(gè)時(shí)間范圍,那么系統(tǒng)就被認(rèn)為是不可用的。
返回結(jié)果是可用性的另一個(gè)非常重要的指標(biāo),它要求系統(tǒng)在完成對(duì)用戶(hù)請(qǐng)求的處理后,返回一個(gè)正常的響應(yīng)結(jié)果,不論這個(gè)結(jié)果是成功還是失敗。
分區(qū)容錯(cuò)性(Partition tolerance )
布式系統(tǒng)在遇到任何網(wǎng)絡(luò)分區(qū)故障的時(shí)候,仍然需要能夠保證對(duì)外提供滿(mǎn)足一致性和可用性的服務(wù),除非是整個(gè)網(wǎng)絡(luò)環(huán)境都發(fā)生了故障。
X/Open DTP模型與XA規(guī)范
X/Open,即現(xiàn)在的open group,是一個(gè)獨(dú)立的組織,主要負(fù)責(zé)制定各種行業(yè)技術(shù)標(biāo)準(zhǔn)。官網(wǎng)地址:http://www.opengroup.org/。X/Open組織主要由各大知名公司或者廠(chǎng)商進(jìn)行支持,這些組織不光遵循X/Open組織定義的行業(yè)技術(shù)標(biāo)準(zhǔn),也參與到標(biāo)準(zhǔn)的制定。下圖展示了open group目前主要成員(官網(wǎng)截圖):
DTP模型
應(yīng)用程序(Application Program ,簡(jiǎn)稱(chēng)AP):用于定義事務(wù)邊界(即定義事務(wù)的開(kāi)始和結(jié)束),并且在事務(wù)邊界內(nèi)對(duì)資源進(jìn)行操作。
資源管理器(Resource Manager,簡(jiǎn)稱(chēng)RM,一般也稱(chēng)為事務(wù)參與者):如數(shù)據(jù)庫(kù)、文件系統(tǒng)等,并提供訪(fǎng)問(wèn)資源的方式。
事務(wù)管理器(Transaction Manager ,簡(jiǎn)稱(chēng)TM,一般也稱(chēng)為事務(wù)協(xié)調(diào)者):負(fù)責(zé)分配事務(wù)唯一標(biāo)識(shí),監(jiān)控事務(wù)的執(zhí)行進(jìn)度,并負(fù)責(zé)事務(wù)的提交、回滾等。
XA規(guī)范
這里的接口規(guī)范特別多,我們只要來(lái)講講幾個(gè)最重要的。
xa_start : 在 RM端調(diào)用此接口開(kāi)啟一個(gè)XA事務(wù),后面需要接上XID 作為參數(shù)。
xa_end : 取消當(dāng)前線(xiàn)程與事務(wù)的關(guān)聯(lián), 與 xa_start是配對(duì)使用。
xa_prepare : 詢(xún)問(wèn)RM 是否已經(jīng)準(zhǔn)備好了提交事務(wù)。
xa_commit : 通知RM 提交事務(wù)分支。
xa_rollback : 通知RM 提交回滾事務(wù)分支。
XA二階段提交
階段一 :TM通知各個(gè)RM準(zhǔn)備提交它們的事務(wù)分支。如果RM判斷自己進(jìn)行的工作可以被提交,那就就對(duì)工作內(nèi)容進(jìn)行持久化,再給TM肯定答復(fù);要是發(fā)生了其他情況,那給TM的都是否定答復(fù)。在發(fā)送了否定答復(fù)并回滾了已經(jīng)的工作后,RM就可以丟棄這個(gè)事務(wù)分支信息。
階段二 :TM根據(jù)階段1各個(gè)RM prepare的結(jié)果,決定是提交還是回滾事務(wù)。如果所有的RM都prepare成功,那么TM通知所有的RM進(jìn)行提交;如果有RM prepare失敗的話(huà),則TM通知所有RM回滾自己的事務(wù)分支。
MySQL對(duì)XA協(xié)議的支持
MySQL 從5.0.3開(kāi)始支持XA分布式事務(wù),且只有InnoDB存儲(chǔ)引擎支持XA事務(wù)。MySQL 在DTP模型中也是屬于資源管理器RM。
MySQL XA 事務(wù)的 SQL語(yǔ)法
XA?START?xid????//開(kāi)啟XA事務(wù),xid是一個(gè)唯一值,表示事務(wù)分支標(biāo)識(shí)符 XA?END?xid??//結(jié)束一個(gè)XA事務(wù), XA?PREPARE?xid?準(zhǔn)備提交 XA COMMIT xid [ONE PHASE]?//提交事務(wù)。兩階段提交協(xié)議中,如果只有一個(gè)RM參與,那么可以?xún)?yōu)化為一階段提交 XA?ROLLBACK?xid??//回滾 XA?RECOVER?[CONVERT?XID]??//列出所有處于PREPARE階段的XA事務(wù)MySQL xid詳解
mysql中使用xid來(lái)作為一個(gè)事務(wù)分支的標(biāo)識(shí)符。通過(guò)C語(yǔ)言進(jìn)行描述,如下:
/? ??Transaction?branch?identification:?XID?and?NULLXID: ?/ #define?XIDDATASIZE?128??/??size?in?bytes??/ #define?MAXGTRIDSIZE?64??/??maximum?size?in?bytes?of?gtrid??/ #define?MAXBQUALSIZE?64??/??maximum?size?in?bytes?of?bqual??/ struct?xid_t?{long?formatID;?????/*?format?identifier?*/long?gtrid_length;?/*?value?1-64?*/long?bqual_length;?/*?value?1-64?*/char?data[XIDDATASIZE];}; /? ??A?value?of?-1?in?formatID?means?that?the?XID?is?null. ?/ typedef?struct?xid_t?XID; /? ??Declarations?of?routines?by?which?RMs?call?TMs: ?/ extern?int?ax_reg(int,?XID??,?long); extern?int?ax_unreg(int,?long);gtrid :全局事務(wù)標(biāo)識(shí)符(global transaction identifier),最大不能超過(guò)64字節(jié)。
bqual :分支限定符(branch qualifier),最大不能超過(guò)64字節(jié)。
formatId:記錄gtrid、bqual的格式,類(lèi)似于memcached中flags字段的作用。
data :xid的值,其是 gtrid和bqual拼接后的內(nèi)容。。
MySQL XA事務(wù)狀態(tài)
JTA規(guī)范
JTA(Java Transaction API):為J2EE平臺(tái)提供了分布式事務(wù)服務(wù)(distributed transaction)的能力。某種程度上,可以認(rèn)為JTA規(guī)范是XA規(guī)范的Java版,其把XA規(guī)范中規(guī)定的DTP模型交互接口抽象成Java接口中的方法,并規(guī)定每個(gè)方法要實(shí)現(xiàn)什么樣的功能。
JTA 定義的接口
javax.transaction.TransactionManager : 事務(wù)管理器,負(fù)責(zé)事務(wù)的begin, commit,rollback 等命令。
javax.transaction.UserTransaction:用于聲明一個(gè)分布式事務(wù)。
javax.transaction.TransactionSynchronizationRegistry:事務(wù)同步注冊(cè)
javax.transaction.xa.XAResource:定義RM提供給TM操作的接口
javax.transaction.xa.Xid:事務(wù)xid接口。
TM provider:
實(shí)現(xiàn)UserTransaction、TransactionManager、Transaction、TransactionSynchronizationRegistry、Synchronization、Xid接口,通過(guò)與XAResource接口交互來(lái)實(shí)現(xiàn)分布式事務(wù)。
RM provider:
XAResource接口需要由資源管理器者來(lái)實(shí)現(xiàn),XAResource接口中定義了一些方法,這些方法將會(huì)被TM進(jìn)行調(diào)用,如:
start方法:開(kāi)啟事務(wù)分支
end方法:結(jié)束事務(wù)分支
prepare方法:準(zhǔn)備提交
commit方法:提交
rollback方法:回滾
recover方法:列出所有處于PREPARED狀態(tài)的事務(wù)分支
ShardingSphere對(duì)XA分布式事務(wù)的支持
ShardingSphere針對(duì)XA分布式事務(wù)的接口以及JTA規(guī)范,提供了標(biāo)準(zhǔn)的,基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.spi.ShardingTransactionManager。
public?interface?ShardingTransactionManager?extends?AutoCloseable?{/***?Initialize?sharding?transaction?manager.**?@param?databaseType?database?type*?@param?resourceDataSources?resource?data?sources*/void?init(DatabaseType?databaseType,?Collection<ResourceDataSource>?resourceDataSources);/***?Get?transaction?type.**?@return?transaction?type*/TransactionType?getTransactionType();/***?Judge?is?in?transaction?or?not.**?@return?in?transaction?or?not*/boolean?isInTransaction();/***?Get?transactional?connection.**?@param?dataSourceName?data?source?name*?@return?connection*?@throws?SQLException?SQL?exception*/Connection?getConnection(String?dataSourceName)?throws?SQLException;/***?Begin?transaction.*/void?begin();/***?Commit?transaction.*/void?commit();/***?Rollback?transaction.*/void?rollback(); }對(duì)于XA分布式事務(wù)的支持的具體實(shí)現(xiàn)類(lèi)為 :org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager。在此類(lèi)中,會(huì)調(diào)用基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.xa.spi.XATransactionManager,來(lái)進(jìn)行XA事務(wù)的管理操作。
總結(jié)
我們了解了分布式事務(wù)的CAP理論,了解了X/Open的DTP模型,以及XA的接口規(guī)范,MySQL對(duì)XA協(xié)議的支持。最好我們講解了JTA的規(guī)范,以及ShardingSphere對(duì)XA事務(wù)進(jìn)行整合的時(shí)候定義的SPI接口,這些都是很重要的理論基礎(chǔ),接下來(lái),我們將詳細(xì)來(lái)講解基于AtomkikosXATransactionManager的具體實(shí)現(xiàn),以及源碼解析。
Shardingsphere整合Atomikos對(duì)XA分布式事務(wù)的源碼解析
Atomikos(https://www.atomikos.com/),其實(shí)是一家公司的名字,提供了基于JTA規(guī)范的XA分布式事務(wù)TM的實(shí)現(xiàn)。其旗下最著名的產(chǎn)品就是事務(wù)管理器。產(chǎn)品分兩個(gè)版本:
TransactionEssentials:開(kāi)源的免費(fèi)產(chǎn)品;
ExtremeTransactions:上商業(yè)版,需要收費(fèi)。
這兩個(gè)產(chǎn)品的關(guān)系如下圖所示:
ExtremeTransactions在TransactionEssentials的基礎(chǔ)上額外提供了以下功能(重要的):
支持TCC:這是一種柔性事務(wù)
支持通過(guò)RMI、IIOP、SOAP這些遠(yuǎn)程過(guò)程調(diào)用技術(shù),進(jìn)行事務(wù)傳播。
事務(wù)日志云存儲(chǔ),云端對(duì)事務(wù)進(jìn)行恢復(fù),并且提供了完善的管理后臺(tái)。
org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager詳解
我們簡(jiǎn)單的來(lái)回顧下org.apache.shardingsphere.transaction.spi.ShardingTransactionManager
public?interface?ShardingTransactionManager?extends?AutoCloseable?{/***?Initialize?sharding?transaction?manager.**?@param?databaseType?database?type*?@param?resourceDataSources?resource?data?sources*/void?init(DatabaseType?databaseType,?Collection<ResourceDataSource>?resourceDataSources);/***?Get?transaction?type.**?@return?transaction?type*/TransactionType?getTransactionType();/***?Judge?is?in?transaction?or?not.**?@return?in?transaction?or?not*/boolean?isInTransaction();/***?Get?transactional?connection.**?@param?dataSourceName?data?source?name*?@return?connection*?@throws?SQLException?SQL?exception*/Connection?getConnection(String?dataSourceName)?throws?SQLException;/***?Begin?transaction.*/void?begin();/***?Commit?transaction.*/void?commit();/***?Rollback?transaction.*/void?rollback(); }我們重點(diǎn)先關(guān)注init方法,從它的命名,你就應(yīng)該能夠看出來(lái),這是整個(gè)框架的初始化方法,讓我們來(lái)看看它是如何進(jìn)行初始化的。
private?final?Map<String,?XATransactionDataSource>?cachedDataSources?=?new?HashMap<>();private?final?XATransactionManager?xaTransactionManager?=?XATransactionManagerLoader.getInstance().getTransactionManager();@Overridepublic?void?init(final?DatabaseType?databaseType,?final?Collection<ResourceDataSource>?resourceDataSources)?{for?(ResourceDataSource?each?:?resourceDataSources)?{cachedDataSources.put(each.getOriginalName(),?new?XATransactionDataSource(databaseType,?each.getUniqueResourceName(),?each.getDataSource(),?xaTransactionManager));}xaTransactionManager.init();}首先SPI的方式加載XATransactionManager的具體實(shí)現(xiàn)類(lèi),這里返回的就是org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager。
我們?cè)陉P(guān)注下 new XATransactionDataSource() , 進(jìn)入 org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource類(lèi)的構(gòu)造方法。
我們重點(diǎn)來(lái)關(guān)注 XADataSourceFactory.build(databaseType, dataSource),從名字我們就可以看出,這應(yīng)該是返回JTA規(guī)范里面的XADataSource,在ShardingSphere里面很多的功能,可以從代碼風(fēng)格的命名上就能猜出來(lái),這就是優(yōu)雅代碼(吹一波)。不多逼逼,我們進(jìn)入該方法。
首先又是一個(gè)SPI定義的 XADataSourceDefinitionFactory,它根據(jù)不同的數(shù)據(jù)庫(kù)類(lèi)型,來(lái)加載不同的方言。然后我們進(jìn)入 swap方法。
很簡(jiǎn)明,第一步創(chuàng)建,XADataSource,第二步給它設(shè)置屬性(包含數(shù)據(jù)的連接,用戶(hù)名密碼等),然后返回。
返回 XATransactionDataSource 類(lèi),關(guān)注xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource); 從名字可以看出,這是注冊(cè)事務(wù)恢復(fù)資源。這個(gè)我們?cè)谑聞?wù)恢復(fù)的時(shí)候詳解。
返回 XAShardingTransactionManager.init() ,我們重點(diǎn)來(lái)關(guān)注:xaTransactionManager.init();,最后進(jìn)入AtomikosTransactionManager.init()。流程圖如下:
代碼:
public?final?class?AtomikosTransactionManager?implements?XATransactionManager?{private?final?UserTransactionManager?transactionManager?=?new?UserTransactionManager();private?final?UserTransactionService?userTransactionService?=?new?UserTransactionServiceImp();@Overridepublic?void?init()?{userTransactionService.init();}}進(jìn)入U(xiǎn)serTransactionServiceImp.init()
我們重點(diǎn)關(guān)注,獲取配置屬性。最后進(jìn)入com.atomikos.icatch.provider.imp.AssemblerImp.initializeProperties()方法。
接下來(lái)重點(diǎn)關(guān)注, Configuration.init(), 進(jìn)行初始化。
我們先來(lái)關(guān)注 assembleSystemComponents(configProperties); 進(jìn)入它,進(jìn)入com.atomikos.icatch.provider.imp.AssemblerImp.assembleTransactionService()方法:
我們重點(diǎn)來(lái)分析createOltpLogFromClasspath(), 采用SPI的加載方式來(lái)獲取,默認(rèn)這里會(huì)返回 null, 什么意思呢?就是當(dāng)沒(méi)有擴(kuò)展的時(shí)候,atomikos,會(huì)創(chuàng)建框架自定義的資源,來(lái)存儲(chǔ)事務(wù)日志。
我們跟著進(jìn)入 Repository repository = createRepository(configProperties);
這里就會(huì)創(chuàng)建出 CachedRepository,里面包含了 ?InMemoryRepository 與 FileSystemRepository
回到主線(xiàn) com.atomikos.icatch.config.Configuration.init(), 最后來(lái)分析下notifyAfterInit();
插件的初始化會(huì)進(jìn)入com.atomikos.icatch.jta.JtaTransactionServicePlugin.afterInit()
重點(diǎn)注意 RecoveryLog recoveryLog = Configuration.getRecoveryLog(); ,如果用戶(hù)采用SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog ,這里就會(huì)返回 null。如果是null,則不會(huì)對(duì) XaResourceRecoveryManager 進(jìn)行初始化。
回到 notifyAfterInit(), 我們來(lái)分析 setRecoveryService。
我們進(jìn)入 recover() 方法:
看到最關(guān)鍵的注釋了嗎,如果用戶(hù)采用SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog,那么XaResourceRecoveryManager 為null,則就會(huì)進(jìn)行云端恢復(fù),反之則進(jìn)行事務(wù)恢復(fù)。事務(wù)恢復(fù)很復(fù)雜,我們會(huì)單獨(dú)來(lái)講。
到這里atomikos的基本的初始化已經(jīng)完成。
atomikos事務(wù)begin流程
我們知道,本地的事務(wù),都會(huì)有一個(gè) trainsaction.begin, 對(duì)應(yīng)XA分布式事務(wù)來(lái)說(shuō)也不另外,我們?cè)侔阉悸非袚Q回XAShardingTransactionManager.begin(), 會(huì)調(diào)用com.atomikos.icatch.jta.TransactionManagerImp.begin()。流程圖如下:
代碼:
??public?void?begin?(?int?timeout?)?throws?NotSupportedException,SystemException{CompositeTransaction?ct?=?null;ResumePreviousTransactionSubTxAwareParticipant?resumeParticipant?=?null;ct?=?compositeTransactionManager.getCompositeTransaction();if?(?ct?!=?null?&&?ct.getProperty?(??JTA_PROPERTY_NAME?)?==?null?)?{LOGGER.logWarning?(?"JTA:?temporarily?suspending?incompatible?transaction:?"?+?ct.getTid()?+"?(will?be?resumed?after?JTA?transaction?ends)"?);ct?=?compositeTransactionManager.suspend();resumeParticipant?=?new?ResumePreviousTransactionSubTxAwareParticipant?(?ct?);}try?{//創(chuàng)建事務(wù)補(bǔ)償點(diǎn)ct?=?compositeTransactionManager.createCompositeTransaction?(?(?(?long?)?timeout?)?*?1000?);if?(?resumeParticipant?!=?null?)?ct.addSubTxAwareParticipant?(?resumeParticipant?);if?(?ct.isRoot?()?&&?getDefaultSerial?()?)ct.setSerial?();ct.setProperty?(?JTA_PROPERTY_NAME?,?"true"?);}?catch?(?SysException?se?)?{String?msg?=?"Error?in?begin()";LOGGER.logError(?msg?,?se?);throw?new?ExtendedSystemException?(?msg?,?se?);}recreateCompositeTransactionAsJtaTransaction(ct);}這里我們主要關(guān)注 compositeTransactionManager.createCompositeTransaction(),
創(chuàng)建了事務(wù)補(bǔ)償點(diǎn),然后把他放到了用當(dāng)前線(xiàn)程作為key的Map當(dāng)中,這里思考,為啥它不用 threadLocal。
到這里atomikos的事務(wù)begin流程已經(jīng)完成。大家可能有些疑惑,begin好像什么都沒(méi)有做,XA start 也沒(méi)調(diào)用?別慌,下一節(jié)繼續(xù)來(lái)講。
XATransactionDataSource getConnection() 流程
我們都知道想要執(zhí)行SQL語(yǔ)句,必須要獲取到數(shù)據(jù)庫(kù)的connection。讓我們?cè)倩氐?XAShardingTransactionManager.getConnection() 最后會(huì)調(diào)用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()。流程圖如下:
代碼 :
?public?Connection?getConnection()?throws?SQLException,?SystemException,?RollbackException?{//先檢查是否已經(jīng)有存在的connection,這一步很關(guān)心,也是XA的關(guān)鍵,因?yàn)閄A事務(wù),必須在同一個(gè)connectionif?(CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{return?dataSource.getConnection();}//獲取數(shù)據(jù)庫(kù)連接Connection?result?=?dataSource.getConnection();//轉(zhuǎn)成XAConnection,其實(shí)是同一個(gè)連接X(jué)AConnection?xaConnection?=?XAConnectionFactory.createXAConnection(databaseType,?xaDataSource,?result);//獲取JTA事務(wù)定義接口Transaction?transaction?=?xaTransactionManager.getTransactionManager().getTransaction();if?(!enlistedTransactions.get().contains(transaction))?{//進(jìn)行資源注冊(cè)transaction.enlistResource(new?SingleXAResource(resourceName,?xaConnection.getXAResource()));transaction.registerSynchronization(new?Synchronization()?{@Overridepublic?void?beforeCompletion()?{enlistedTransactions.get().remove(transaction);}@Overridepublic?void?afterCompletion(final?int?status)?{enlistedTransactions.get().clear();}});enlistedTransactions.get().add(transaction);}return?result;}首先第一步很關(guān)心,尤其是對(duì)shardingsphere來(lái)說(shuō),因?yàn)樵谝粋€(gè)事務(wù)里面,會(huì)有多個(gè)SQL語(yǔ)句,打到相同的數(shù)據(jù)庫(kù),所以對(duì)相同的數(shù)據(jù)庫(kù),必須獲取同一個(gè)XAConnection,這樣才能進(jìn)行XA事務(wù)的提交與回滾。
我們接下來(lái)關(guān)心 transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 會(huì)進(jìn)入com.atomikos.icatch.jta.TransactionImp.enlistResource(), 代碼太長(zhǎng),截取一部分。
我們直接看 restx.resume();
哦多尅,看見(jiàn)了嗎,各位,看見(jiàn)了 this.xaresource.start(this.xid, flag); 了嗎????,我們進(jìn)去,假設(shè)我們使用的Mysql數(shù)據(jù)庫(kù):
組裝XA start Xid SQL語(yǔ)句,進(jìn)行執(zhí)行。
到這里,我們總結(jié)下,在獲取數(shù)據(jù)庫(kù)連接的時(shí)候,我們執(zhí)行了XA協(xié)議接口中的 XA start xid
atomikos事務(wù)commit流程
好了,上面我們已經(jīng)開(kāi)啟了事務(wù),現(xiàn)在我們來(lái)分析下事務(wù)commit流程,我們?cè)侔岩暯乔袚Q回XAShardingTransactionManager.commit(),最后我們會(huì)進(jìn)入com.atomikos.icatch.imp.CompositeTransactionImp.commit() 方法。流程圖如下:
代碼:
?public?void?commit?()?throws?HeurRollbackException,?HeurMixedException,HeurHazardException,?SysException,?SecurityException,RollbackException{//首先更新下事務(wù)日志的狀態(tài)doCommit?();setSiblingInfoForIncoming1pcRequestFromRemoteClient();if?(?isRoot?()?)?{//真正的commit操作coordinator.terminate?(?true?);}}我們關(guān)注coordinator.terminate ( true );
首先會(huì)判斷參與者的個(gè)數(shù),這里我們可以理解為MySQL的database數(shù)量,如果只有一個(gè),退化成一階段,直接提交。如果有多個(gè),則走標(biāo)準(zhǔn)的XA二階段提交流程。
我們來(lái)看 prepare (); 流程,最后會(huì)走到com.atomikos.icatch.imp.PrepareMessage.send() ---> com.atomikos.datasource.xa.XAResourceTransaction.prepare()
終于,我們看到了這么一句 ret = this.xaresource.prepare(this.xid); 但是等等,我們之前不是說(shuō)了,XA start xid 以后要先 XA end xid 嗎?答案就在 suspend(); 里面。
到了這里,我們已經(jīng)執(zhí)行了 XA start xid -> XA end xid --> XA prepare xid, 接下來(lái)就是最后一步 commit
我們?cè)倩氐?terminate(false) 方法,來(lái)看 commit()流程。其實(shí)和 prepare流程一樣,最后會(huì)走到 com.atomikos.datasource.xa.XAResourceTransaction.commit()。commit執(zhí)行完,數(shù)據(jù)提交
思考:這里的參與者提交是在一個(gè)循環(huán)里面,一個(gè)一個(gè)提交的,如果之前的提交了,后面的參與者提交的時(shí)候,掛了,就會(huì)造成數(shù)據(jù)的不一致性。
Atomikos rollback() 流程
上面我們已經(jīng)分析了commit流程,其實(shí)rollback流程和commit流程一樣,我們?cè)诎涯抗馇袚Q回 org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback() ,最后會(huì)執(zhí)行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()。
重點(diǎn)關(guān)注 coordinator.terminate ( false ); ,這個(gè)和 commit流程是一樣的,只不過(guò)在 commit流程里面,參數(shù)傳的是true。
我們重點(diǎn)關(guān)注 rollback() ,最后會(huì)走到com.atomikos.datasource.xa.XAResourceTransaction.rollback()。
先在supend()方法里面執(zhí)行了 XA end xid 語(yǔ)句, 接下來(lái)執(zhí)行 this.xaresource.rollback(this.xid); 進(jìn)行數(shù)據(jù)的回滾。
Atomikos-recover 流程
說(shuō)事務(wù)恢復(fù)流程之前,我們來(lái)討論下,會(huì)啥會(huì)出現(xiàn)事務(wù)恢復(fù)?XA二階段提交協(xié)議不是強(qiáng)一致性的嗎?要解答這個(gè)問(wèn)題,我們就要來(lái)看看XA二階段協(xié)議有什么問(wèn)題?
問(wèn)題一 :單點(diǎn)故障
由于協(xié)調(diào)者的重要性,一旦協(xié)調(diào)者TM發(fā)生故障。參與者RM會(huì)一直阻塞下去。尤其在第二階段,協(xié)調(diào)者發(fā)生故障,那么所有的參與者還都處于鎖定事務(wù)資源的狀態(tài)中,而無(wú)法繼續(xù)完成事務(wù)操作。(如果是協(xié)調(diào)者掛掉,可以重新選舉一個(gè)協(xié)調(diào)者,但是無(wú)法解決因?yàn)閰f(xié)調(diào)者宕機(jī)導(dǎo)致的參與者處于阻塞狀態(tài)的問(wèn)題)
問(wèn)題二 :數(shù)據(jù)不一致
數(shù)據(jù)不一致。在二階段提交的階段二中,當(dāng)協(xié)調(diào)者向參與者發(fā)送commit請(qǐng)求之后,發(fā)生了局部網(wǎng)絡(luò)異常或者在發(fā)送commit請(qǐng)求過(guò)程中協(xié)調(diào)者發(fā)生了故障,這會(huì)導(dǎo)致只有一部分參與者接受到了commit請(qǐng)求。而在這部分參與者接到commit請(qǐng)求之后就會(huì)執(zhí)行commit操作。但是其他部分未接到commit請(qǐng)求的機(jī)器則無(wú)法執(zhí)行事務(wù)提交。于是整個(gè)分布式系統(tǒng)便出現(xiàn)了數(shù)據(jù)不一致性的現(xiàn)象。
如何解決?
解決的方案簡(jiǎn)單,就是我們?cè)谑聞?wù)的操作的每一步,我們都需要對(duì)事務(wù)狀態(tài)的日志進(jìn)行人為的記錄,我們可以把日志記錄存儲(chǔ)在我們想存儲(chǔ)的地方,可以是本地存儲(chǔ),也可以中心化的存儲(chǔ)。atomikos的開(kāi)源版本,我們之前也分析了,它是使用內(nèi)存 + file的方式,存儲(chǔ)在本地,這樣的話(huà),如果在一個(gè)集群系統(tǒng)里面,如果有節(jié)點(diǎn)宕機(jī),日志又存儲(chǔ)在本地,所以事務(wù)不能及時(shí)的恢復(fù)(需要重啟服務(wù))。
Atomikos 多場(chǎng)景下事務(wù)恢復(fù)。
Atomikos 提供了二種方式,來(lái)應(yīng)對(duì)不同場(chǎng)景下的異常情況。
場(chǎng)景一:服務(wù)節(jié)點(diǎn)不宕機(jī),因?yàn)槠渌脑?#xff0c;產(chǎn)生需要事務(wù)恢復(fù)的情況。這個(gè)時(shí)候才要定時(shí)任務(wù)進(jìn)行恢復(fù)。具體的代碼 com.atomikos.icatch.imp.TransactionServiceImp.init() 方法,會(huì)初始化一個(gè)定時(shí)任務(wù),進(jìn)行事務(wù)的恢復(fù)。
最終會(huì)進(jìn)入com.atomikos.datasource.xa.XATransactionalResource.recover() 方法。
場(chǎng)景二: 當(dāng)服務(wù)節(jié)點(diǎn)宕機(jī)重啟動(dòng)過(guò)程中進(jìn)行事務(wù)的恢復(fù)。具體實(shí)現(xiàn)在com.atomikos.datasource.xa.XATransactionalResource.setRecoveryService()方法里面
com.atomikos.datasource.xa.XATransactionalResource.recover() 流程詳解。
主代碼:
?public?void?recover(XAResource?xaResource)?throws?XAException?{//?根據(jù)XA?recovery?協(xié)議獲取?xidList<XID>?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);Collection<XID>?xidsToCommit;try?{//?xid?與日志記錄的xid進(jìn)行匹配xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();for?(XID?xid?:?xidsToRecover)?{if?(xidsToCommit.contains(xid))?{//執(zhí)行?XA?commit?xid?進(jìn)行提交replayCommit(xid,?xaResource);}?else?{attemptPresumedAbort(xid,?xaResource);}}}?catch?(LogException?couldNotRetrieveCommittingXids)?{LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);}}我們來(lái)看一下如何根據(jù) XA recovery 協(xié)議獲取RM端存儲(chǔ)的xid。進(jìn)入方法 retrievePreparedXidsFromXaResource(xaResource), 最后進(jìn)入 com.atomikos.datasource.xa.RecoveryScan.recoverXids()方法。
我們重點(diǎn)關(guān)注xidsFromLastScan = xaResource.recover(flags); 這個(gè)方法,如果我們使用MySQL,那么就會(huì)進(jìn)入 MysqlXAConnection.recover()方法。執(zhí)行 XA recovery xid 語(yǔ)句來(lái)獲取 xid。
這里要注意如果Mysql的版本 <5.7.7 ,則不會(huì)有任何數(shù)據(jù),在以后的版本中Mysql進(jìn)行了修復(fù),因此如果我們想要使用MySQL充當(dāng)RM,版本必須 >= 5.7.7 ,原因是:
MySQL 5.6版本在客戶(hù)端退出的時(shí)候,自動(dòng)把已經(jīng)prepare的事務(wù)回滾了,那么MySQL為什么要這樣做?這主要取決于MySQL的內(nèi)部實(shí)現(xiàn),MySQL 5.7以前的版本,對(duì)于prepare的事務(wù),MySQL是不會(huì)記錄binlog的(官方說(shuō)是減少fsync,起到了優(yōu)化的作用)。只有當(dāng)分布式事務(wù)提交的時(shí)候才會(huì)把前面的操作寫(xiě)入binlog信息,所以對(duì)于binlog來(lái)說(shuō),分布式事務(wù)與普通的事務(wù)沒(méi)有區(qū)別,而prepare以前的操作信息都保存在連接的IO_CACHE中,如果這個(gè)時(shí)候客戶(hù)端退出了,以前的binlog信息都會(huì)被丟失,再次重連后允許提交的話(huà),會(huì)造成Binlog丟失,從而造成主從數(shù)據(jù)的不一致,所以官方在客戶(hù)端退出的時(shí)候直接把已經(jīng)prepare的事務(wù)都回滾了!
回到主線(xiàn)再?gòu)淖约河涗浀氖聞?wù)日志里面獲取XID
我們來(lái)看下獲取事務(wù)日志里面的XID的retrieveExpiredCommittingXidsFromLog()方法。然后進(jìn)入com.atomikos.recovery.imp.RecoveryLogImp.getCommittingParticipants()方法。
到這里我們來(lái)簡(jiǎn)單介紹一下,事務(wù)日志的存儲(chǔ)結(jié)構(gòu)。首先是 CoordinatorLogEntry,這是一次XA事務(wù)的所有信息實(shí)體類(lèi)。
public?class?CoordinatorLogEntry?implements?Serializable?{//全局事務(wù)idpublic?final?String?id;//是否已經(jīng)提交public?final?boolean?wasCommitted;/***?Only?for?subtransactions,?null?otherwise.*/public?final?String?superiorCoordinatorId;//參與者集合public?final?ParticipantLogEntry[]?participants; }再來(lái)看一下參與者實(shí)體類(lèi) ParticipantLogEntry :
回到com.atomikos.recovery.xa.DefaultXaRecoveryLog.getExpiredCommittingXids() 方法,可以到獲取了一次XA事務(wù)過(guò)程中,存儲(chǔ)的事務(wù)日志中的xid。
如果從RM中通過(guò)XA recovery取出的XID,包含在從事務(wù)日志中取出的XID,則進(jìn)行commit,否則進(jìn)行rollback.
replayCommit 方法如下:
attemptPresumedAbort(xid, xaResource); 方法如下:
總結(jié)
文章到此,已經(jīng)寫(xiě)的很長(zhǎng)很多了,我們分析了ShardingSphere對(duì)于XA方案,提供了一套SPI解決方案,對(duì)Atomikos進(jìn)行了整合,也分析了Atomikos初始化流程,開(kāi)始事務(wù)流程,獲取連接流程,提交事務(wù)流程,回滾事務(wù)流程,事務(wù)恢復(fù)流程。希望對(duì)大家理解XA的原理有所幫助。
加入我們
Apache ShardingSphere 一直踐行Apache Way的開(kāi)源之道,社區(qū)完全開(kāi)放與平等,人人享受開(kāi)源帶來(lái)的快樂(lè)。
地址: https://github.com/apache/shardingsphere
作者介紹:肖宇,Apache ShardingSphere Committer,開(kāi)源hmily分布式事務(wù)框架作者, 開(kāi)源soul網(wǎng)關(guān)作者,熱愛(ài)開(kāi)源,追求寫(xiě)優(yōu)雅代碼。目前就職入京東數(shù)科,參與ShardingSphere的開(kāi)源建設(shè),以及分布式數(shù)據(jù)庫(kù)的研發(fā)工作。
想知道更多?掃描下面的二維碼關(guān)注我
后臺(tái)回復(fù)"技術(shù)",加入技術(shù)群
【精彩推薦】
原創(chuàng)|OpenAPI標(biāo)準(zhǔn)規(guī)范
如此簡(jiǎn)單| ES最全詳細(xì)使用教程
ClickHouse到底是什么?為什么如此牛逼!
原來(lái)ElasticSearch還可以這么理解
面試官:InnoDB中一棵B+樹(shù)可以存放多少行數(shù)據(jù)?
微服務(wù)下如何解耦?對(duì)于已經(jīng)緊耦合下如何重構(gòu)?
如何構(gòu)建一套高性能、高可用、低成本的視頻處理系統(tǒng)?
架構(gòu)之道:分離業(yè)務(wù)邏輯和技術(shù)細(xì)節(jié)
星巴克不使用兩階段提交
點(diǎn)個(gè)贊+在看,少個(gè) bug?????
總結(jié)
以上是生活随笔為你收集整理的深度剖析Apache Shardingsphere对分布式事务的支持的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 收藏:DPDK内存基本概念
- 下一篇: 面试题:1 到 1000 之间有多少个