日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

深度剖析Apache Shardingsphere对分布式事务的支持

發(fā)布時(shí)間:2024/4/11 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 深度剖析Apache Shardingsphere对分布式事务的支持 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Apache ShardingSphere 是一套開(kāi)源的分布式數(shù)據(jù)庫(kù)中間件解決方案組成的生態(tài)圈,它由 JDBC、Proxy 和 Sidecar(規(guī)劃中)這 3 款相互獨(dú)立,卻又能夠混合部署配合使用的產(chǎn)品組成。它們均提供標(biāo)準(zhǔn)化的數(shù)據(jù)分片、分布式事務(wù)和數(shù)據(jù)庫(kù)治理功能,可適用于如 Java 同構(gòu)、異構(gòu)語(yǔ)言、云原生等各種多樣化的應(yīng)用場(chǎng)景。ShardingSphere 已于2020年4月16日成為 Apache 軟件基金會(huì)的頂級(jí)項(xiàng)目。

分布式系統(tǒng)CAP理論

一致性(Consistency)
  • 一致性指 all nodes see the same data at the same time,即更新操作成功并返回客戶(hù)端完成后,所有節(jié)點(diǎn)在同一時(shí)間的數(shù)據(jù)完全一致,不能存在中間狀態(tài)。

  • 關(guān)于一致性,如果用戶(hù)時(shí)刻看到的數(shù)據(jù)都是一致的,那么稱(chēng)之為強(qiáng)一致性。如果允許存在中間狀態(tài),只要求經(jīng)過(guò)一段時(shí)間后,數(shù)據(jù)最終是一致的,則稱(chēng)之為最終一致性。此外,如果允許存在部分?jǐn)?shù)據(jù)不一致,那么就稱(chēng)之為弱一致性

可用性(Availability)
  • 可用性是指系統(tǒng)提供的服務(wù)必須一直處于可用的狀態(tài),對(duì)于用戶(hù)的每一個(gè)操作請(qǐng)求總是能夠在有限的時(shí)間內(nèi)返回結(jié)果。有限的時(shí)間內(nèi)是指:對(duì)于用戶(hù)的一個(gè)操作請(qǐng)求,系統(tǒng)必須能夠在指定的時(shí)間內(nèi)返回對(duì)應(yīng)的處理結(jié)果,如果超過(guò)了這個(gè)時(shí)間范圍,那么系統(tǒng)就被認(rèn)為是不可用的。

  • 返回結(jié)果是可用性的另一個(gè)非常重要的指標(biāo),它要求系統(tǒng)在完成對(duì)用戶(hù)請(qǐng)求的處理后,返回一個(gè)正常的響應(yīng)結(jié)果,不論這個(gè)結(jié)果是成功還是失敗。

分區(qū)容錯(cuò)性(Partition tolerance )
  • 布式系統(tǒng)在遇到任何網(wǎng)絡(luò)分區(qū)故障的時(shí)候,仍然需要能夠保證對(duì)外提供滿(mǎn)足一致性和可用性的服務(wù),除非是整個(gè)網(wǎng)絡(luò)環(huán)境都發(fā)生了故障。

X/Open DTP模型與XA規(guī)范

X/Open,即現(xiàn)在的open group,是一個(gè)獨(dú)立的組織,主要負(fù)責(zé)制定各種行業(yè)技術(shù)標(biāo)準(zhǔn)。官網(wǎng)地址:http://www.opengroup.org/。X/Open組織主要由各大知名公司或者廠(chǎng)商進(jìn)行支持,這些組織不光遵循X/Open組織定義的行業(yè)技術(shù)標(biāo)準(zhǔn),也參與到標(biāo)準(zhǔn)的制定。下圖展示了open group目前主要成員(官網(wǎng)截圖):

DTP模型

  • 應(yīng)用程序(Application Program ,簡(jiǎn)稱(chēng)AP):用于定義事務(wù)邊界(即定義事務(wù)的開(kāi)始和結(jié)束),并且在事務(wù)邊界內(nèi)對(duì)資源進(jìn)行操作。

  • 資源管理器(Resource Manager,簡(jiǎn)稱(chēng)RM,一般也稱(chēng)為事務(wù)參與者):如數(shù)據(jù)庫(kù)、文件系統(tǒng)等,并提供訪(fǎng)問(wèn)資源的方式。

  • 事務(wù)管理器(Transaction Manager ,簡(jiǎn)稱(chēng)TM,一般也稱(chēng)為事務(wù)協(xié)調(diào)者):負(fù)責(zé)分配事務(wù)唯一標(biāo)識(shí),監(jiān)控事務(wù)的執(zhí)行進(jìn)度,并負(fù)責(zé)事務(wù)的提交、回滾等。

XA規(guī)范

這里的接口規(guī)范特別多,我們只要來(lái)講講幾個(gè)最重要的。

  • xa_start : 在 RM端調(diào)用此接口開(kāi)啟一個(gè)XA事務(wù),后面需要接上XID 作為參數(shù)。

  • xa_end : 取消當(dāng)前線(xiàn)程與事務(wù)的關(guān)聯(lián), 與 xa_start是配對(duì)使用。

  • xa_prepare : 詢(xún)問(wèn)RM 是否已經(jīng)準(zhǔn)備好了提交事務(wù)。

  • xa_commit : 通知RM 提交事務(wù)分支。

  • xa_rollback : 通知RM 提交回滾事務(wù)分支。

XA二階段提交
  • 階段一 :TM通知各個(gè)RM準(zhǔn)備提交它們的事務(wù)分支。如果RM判斷自己進(jìn)行的工作可以被提交,那就就對(duì)工作內(nèi)容進(jìn)行持久化,再給TM肯定答復(fù);要是發(fā)生了其他情況,那給TM的都是否定答復(fù)。在發(fā)送了否定答復(fù)并回滾了已經(jīng)的工作后,RM就可以丟棄這個(gè)事務(wù)分支信息。

  • 階段二 :TM根據(jù)階段1各個(gè)RM prepare的結(jié)果,決定是提交還是回滾事務(wù)。如果所有的RM都prepare成功,那么TM通知所有的RM進(jìn)行提交;如果有RM prepare失敗的話(huà),則TM通知所有RM回滾自己的事務(wù)分支。

MySQL對(duì)XA協(xié)議的支持

MySQL 從5.0.3開(kāi)始支持XA分布式事務(wù),且只有InnoDB存儲(chǔ)引擎支持XA事務(wù)。MySQL 在DTP模型中也是屬于資源管理器RM。

MySQL XA 事務(wù)的 SQL語(yǔ)法
XA?START?xid????//開(kāi)啟XA事務(wù),xid是一個(gè)唯一值,表示事務(wù)分支標(biāo)識(shí)符 XA?END?xid??//結(jié)束一個(gè)XA事務(wù), XA?PREPARE?xid?準(zhǔn)備提交 XA COMMIT xid [ONE PHASE]?//提交事務(wù)。兩階段提交協(xié)議中,如果只有一個(gè)RM參與,那么可以?xún)?yōu)化為一階段提交 XA?ROLLBACK?xid??//回滾 XA?RECOVER?[CONVERT?XID]??//列出所有處于PREPARE階段的XA事務(wù)
MySQL xid詳解

mysql中使用xid來(lái)作為一個(gè)事務(wù)分支的標(biāo)識(shí)符。通過(guò)C語(yǔ)言進(jìn)行描述,如下:

/? ??Transaction?branch?identification:?XID?and?NULLXID: ?/ #define?XIDDATASIZE?128??/??size?in?bytes??/ #define?MAXGTRIDSIZE?64??/??maximum?size?in?bytes?of?gtrid??/ #define?MAXBQUALSIZE?64??/??maximum?size?in?bytes?of?bqual??/ struct?xid_t?{long?formatID;?????/*?format?identifier?*/long?gtrid_length;?/*?value?1-64?*/long?bqual_length;?/*?value?1-64?*/char?data[XIDDATASIZE];}; /? ??A?value?of?-1?in?formatID?means?that?the?XID?is?null. ?/ typedef?struct?xid_t?XID; /? ??Declarations?of?routines?by?which?RMs?call?TMs: ?/ extern?int?ax_reg(int,?XID??,?long); extern?int?ax_unreg(int,?long);
  • gtrid :全局事務(wù)標(biāo)識(shí)符(global transaction identifier),最大不能超過(guò)64字節(jié)。

  • bqual :分支限定符(branch qualifier),最大不能超過(guò)64字節(jié)。

  • formatId:記錄gtrid、bqual的格式,類(lèi)似于memcached中flags字段的作用。

  • data :xid的值,其是 gtrid和bqual拼接后的內(nèi)容。。

MySQL XA事務(wù)狀態(tài)

JTA規(guī)范

JTA(Java Transaction API):為J2EE平臺(tái)提供了分布式事務(wù)服務(wù)(distributed transaction)的能力。某種程度上,可以認(rèn)為JTA規(guī)范是XA規(guī)范的Java版,其把XA規(guī)范中規(guī)定的DTP模型交互接口抽象成Java接口中的方法,并規(guī)定每個(gè)方法要實(shí)現(xiàn)什么樣的功能。

JTA 定義的接口
  • javax.transaction.TransactionManager : 事務(wù)管理器,負(fù)責(zé)事務(wù)的begin, commit,rollback 等命令。

  • javax.transaction.UserTransaction:用于聲明一個(gè)分布式事務(wù)。

  • javax.transaction.TransactionSynchronizationRegistry:事務(wù)同步注冊(cè)

  • javax.transaction.xa.XAResource:定義RM提供給TM操作的接口

  • javax.transaction.xa.Xid:事務(wù)xid接口。

TM provider:
  • 實(shí)現(xiàn)UserTransaction、TransactionManager、Transaction、TransactionSynchronizationRegistry、Synchronization、Xid接口,通過(guò)與XAResource接口交互來(lái)實(shí)現(xiàn)分布式事務(wù)。

RM provider:
  • XAResource接口需要由資源管理器者來(lái)實(shí)現(xiàn),XAResource接口中定義了一些方法,這些方法將會(huì)被TM進(jìn)行調(diào)用,如:

    • start方法:開(kāi)啟事務(wù)分支

    • end方法:結(jié)束事務(wù)分支

    • prepare方法:準(zhǔn)備提交

    • commit方法:提交

    • rollback方法:回滾

    • recover方法:列出所有處于PREPARED狀態(tài)的事務(wù)分支

ShardingSphere對(duì)XA分布式事務(wù)的支持

ShardingSphere針對(duì)XA分布式事務(wù)的接口以及JTA規(guī)范,提供了標(biāo)準(zhǔn)的,基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.spi.ShardingTransactionManager。

public?interface?ShardingTransactionManager?extends?AutoCloseable?{/***?Initialize?sharding?transaction?manager.**?@param?databaseType?database?type*?@param?resourceDataSources?resource?data?sources*/void?init(DatabaseType?databaseType,?Collection<ResourceDataSource>?resourceDataSources);/***?Get?transaction?type.**?@return?transaction?type*/TransactionType?getTransactionType();/***?Judge?is?in?transaction?or?not.**?@return?in?transaction?or?not*/boolean?isInTransaction();/***?Get?transactional?connection.**?@param?dataSourceName?data?source?name*?@return?connection*?@throws?SQLException?SQL?exception*/Connection?getConnection(String?dataSourceName)?throws?SQLException;/***?Begin?transaction.*/void?begin();/***?Commit?transaction.*/void?commit();/***?Rollback?transaction.*/void?rollback(); }

對(duì)于XA分布式事務(wù)的支持的具體實(shí)現(xiàn)類(lèi)為 :org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager。在此類(lèi)中,會(huì)調(diào)用基于SPI實(shí)現(xiàn)的org.apache.shardingsphere.transaction.xa.spi.XATransactionManager,來(lái)進(jìn)行XA事務(wù)的管理操作。

總結(jié)

我們了解了分布式事務(wù)的CAP理論,了解了X/Open的DTP模型,以及XA的接口規(guī)范,MySQL對(duì)XA協(xié)議的支持。最好我們講解了JTA的規(guī)范,以及ShardingSphere對(duì)XA事務(wù)進(jìn)行整合的時(shí)候定義的SPI接口,這些都是很重要的理論基礎(chǔ),接下來(lái),我們將詳細(xì)來(lái)講解基于AtomkikosXATransactionManager的具體實(shí)現(xiàn),以及源碼解析。

Shardingsphere整合Atomikos對(duì)XA分布式事務(wù)的源碼解析

Atomikos(https://www.atomikos.com/),其實(shí)是一家公司的名字,提供了基于JTA規(guī)范的XA分布式事務(wù)TM的實(shí)現(xiàn)。其旗下最著名的產(chǎn)品就是事務(wù)管理器。產(chǎn)品分兩個(gè)版本:

  • TransactionEssentials:開(kāi)源的免費(fèi)產(chǎn)品;

  • ExtremeTransactions:上商業(yè)版,需要收費(fèi)。

這兩個(gè)產(chǎn)品的關(guān)系如下圖所示:

ExtremeTransactions在TransactionEssentials的基礎(chǔ)上額外提供了以下功能(重要的):

  • 支持TCC:這是一種柔性事務(wù)

  • 支持通過(guò)RMI、IIOP、SOAP這些遠(yuǎn)程過(guò)程調(diào)用技術(shù),進(jìn)行事務(wù)傳播。

  • 事務(wù)日志云存儲(chǔ),云端對(duì)事務(wù)進(jìn)行恢復(fù),并且提供了完善的管理后臺(tái)。

org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager詳解

我們簡(jiǎn)單的來(lái)回顧下org.apache.shardingsphere.transaction.spi.ShardingTransactionManager

public?interface?ShardingTransactionManager?extends?AutoCloseable?{/***?Initialize?sharding?transaction?manager.**?@param?databaseType?database?type*?@param?resourceDataSources?resource?data?sources*/void?init(DatabaseType?databaseType,?Collection<ResourceDataSource>?resourceDataSources);/***?Get?transaction?type.**?@return?transaction?type*/TransactionType?getTransactionType();/***?Judge?is?in?transaction?or?not.**?@return?in?transaction?or?not*/boolean?isInTransaction();/***?Get?transactional?connection.**?@param?dataSourceName?data?source?name*?@return?connection*?@throws?SQLException?SQL?exception*/Connection?getConnection(String?dataSourceName)?throws?SQLException;/***?Begin?transaction.*/void?begin();/***?Commit?transaction.*/void?commit();/***?Rollback?transaction.*/void?rollback(); }

我們重點(diǎn)先關(guān)注init方法,從它的命名,你就應(yīng)該能夠看出來(lái),這是整個(gè)框架的初始化方法,讓我們來(lái)看看它是如何進(jìn)行初始化的。

private?final?Map<String,?XATransactionDataSource>?cachedDataSources?=?new?HashMap<>();private?final?XATransactionManager?xaTransactionManager?=?XATransactionManagerLoader.getInstance().getTransactionManager();@Overridepublic?void?init(final?DatabaseType?databaseType,?final?Collection<ResourceDataSource>?resourceDataSources)?{for?(ResourceDataSource?each?:?resourceDataSources)?{cachedDataSources.put(each.getOriginalName(),?new?XATransactionDataSource(databaseType,?each.getUniqueResourceName(),?each.getDataSource(),?xaTransactionManager));}xaTransactionManager.init();}
  • 首先SPI的方式加載XATransactionManager的具體實(shí)現(xiàn)類(lèi),這里返回的就是org.apache.shardingsphere.transaction.xa.atomikos.manager.AtomikosTransactionManager。

  • 我們?cè)陉P(guān)注下 new XATransactionDataSource() , 進(jìn)入 org.apache.shardingsphere.transaction.xa.jta.datasource。XATransactionDataSource類(lèi)的構(gòu)造方法。

public?XATransactionDataSource(final?DatabaseType?databaseType,?final?String?resourceName,?final?DataSource?dataSource,?final?XATransactionManager?xaTransactionManager)?{this.databaseType?=?databaseType;this.resourceName?=?resourceName;this.dataSource?=?dataSource;if?(!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{//?重點(diǎn)關(guān)注?1?,返回了xaDatasourcexaDataSource?=?XADataSourceFactory.build(databaseType,?dataSource);this.xaTransactionManager?=?xaTransactionManager;//?重點(diǎn)關(guān)注2?注冊(cè)資源xaTransactionManager.registerRecoveryResource(resourceName,?xaDataSource);}}
  • 我們重點(diǎn)來(lái)關(guān)注 XADataSourceFactory.build(databaseType, dataSource),從名字我們就可以看出,這應(yīng)該是返回JTA規(guī)范里面的XADataSource,在ShardingSphere里面很多的功能,可以從代碼風(fēng)格的命名上就能猜出來(lái),這就是優(yōu)雅代碼(吹一波)。不多逼逼,我們進(jìn)入該方法。

public?final?class?XADataSourceFactory?{public?static?XADataSource?build(final?DatabaseType?databaseType,?final?DataSource?dataSource)?{return?new?DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource);} }
  • 首先又是一個(gè)SPI定義的 XADataSourceDefinitionFactory,它根據(jù)不同的數(shù)據(jù)庫(kù)類(lèi)型,來(lái)加載不同的方言。然后我們進(jìn)入 swap方法。

?public?XADataSource?swap(final?DataSource?dataSource)?{XADataSource?result?=?createXADataSource();setProperties(result,?getDatabaseAccessConfiguration(dataSource));return?result;}
  • 很簡(jiǎn)明,第一步創(chuàng)建,XADataSource,第二步給它設(shè)置屬性(包含數(shù)據(jù)的連接,用戶(hù)名密碼等),然后返回。

  • 返回 XATransactionDataSource 類(lèi),關(guān)注xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource); 從名字可以看出,這是注冊(cè)事務(wù)恢復(fù)資源。這個(gè)我們?cè)谑聞?wù)恢復(fù)的時(shí)候詳解。

  • 返回 XAShardingTransactionManager.init() ,我們重點(diǎn)來(lái)關(guān)注:xaTransactionManager.init();,最后進(jìn)入AtomikosTransactionManager.init()。流程圖如下:

代碼:

public?final?class?AtomikosTransactionManager?implements?XATransactionManager?{private?final?UserTransactionManager?transactionManager?=?new?UserTransactionManager();private?final?UserTransactionService?userTransactionService?=?new?UserTransactionServiceImp();@Overridepublic?void?init()?{userTransactionService.init();}}
  • 進(jìn)入U(xiǎn)serTransactionServiceImp.init()

private?void?initialize()?{//添加恢復(fù)資源?不用關(guān)心for?(RecoverableResource?resource?:?resources_)?{Configuration.addResource?(?resource?);}for?(LogAdministrator?logAdministrator?:?logAdministrators_)?{Configuration.addLogAdministrator?(?logAdministrator?);}//注冊(cè)插件?不用關(guān)心for?(TransactionServicePlugin?nxt?:?tsListeners_)?{Configuration.registerTransactionServicePlugin?(?nxt?);}//獲取配置屬性?重點(diǎn)關(guān)心ConfigProperties?configProps?=?Configuration.getConfigProperties();configProps.applyUserSpecificProperties(properties_);//進(jìn)行初始化Configuration.init();}
  • 我們重點(diǎn)關(guān)注,獲取配置屬性。最后進(jìn)入com.atomikos.icatch.provider.imp.AssemblerImp.initializeProperties()方法。

?@Overridepublic?ConfigProperties?initializeProperties()?{//讀取classpath下的默認(rèn)配置transactions-defaults.propertiesProperties?defaults?=?new?Properties();loadPropertiesFromClasspath(defaults,?DEFAULT_PROPERTIES_FILE_NAME);//讀取classpath下,transactions.properties配置,覆蓋transactions-defaults.properties中相同key的值Properties?transactionsProperties?=?new?Properties(defaults);loadPropertiesFromClasspath(transactionsProperties,?TRANSACTIONS_PROPERTIES_FILE_NAME);//讀取classpath下,jta.properties,覆蓋transactions-defaults.properties、transactions.properties中相同key的值Properties?jtaProperties?=?new?Properties(transactionsProperties);loadPropertiesFromClasspath(jtaProperties,?JTA_PROPERTIES_FILE_NAME);//讀取通過(guò)java?-Dcom.atomikos.icatch.file方式指定的自定義配置文件路徑,覆蓋之前的同名配置Properties?customProperties?=?new?Properties(jtaProperties);loadPropertiesFromCustomFilePath(customProperties);//最終構(gòu)造一個(gè)ConfigProperties對(duì)象,來(lái)表示實(shí)際要使用的配置Properties?finalProperties?=?new?Properties(customProperties);return?new?ConfigProperties(finalProperties);}
  • 接下來(lái)重點(diǎn)關(guān)注, Configuration.init(), 進(jìn)行初始化。

ublic?static?synchronized?boolean?init()?{boolean?startupInitiated?=?false;if?(service_?==?null)?{startupInitiated?=?true;//SPI方式加載插件注冊(cè),無(wú)需過(guò)多關(guān)心addAllTransactionServicePluginServicesFromClasspath();ConfigProperties?configProperties?=?getConfigProperties();//調(diào)用插件的beforeInit方法進(jìn)行初始化話(huà),無(wú)需過(guò)多關(guān)心notifyBeforeInit(configProperties);//進(jìn)行事務(wù)日志恢復(fù)的初始化,很重要,接下來(lái)詳解assembleSystemComponents(configProperties);//進(jìn)入系統(tǒng)注解的初始化,一般重要initializeSystemComponents(configProperties);notifyAfterInit();if?(configProperties.getForceShutdownOnVmExit())?{addShutdownHook(new?ForceShutdownHook());}}return?startupInitiated;}
  • 我們先來(lái)關(guān)注 assembleSystemComponents(configProperties); 進(jìn)入它,進(jìn)入com.atomikos.icatch.provider.imp.AssemblerImp.assembleTransactionService()方法:

@Overridepublic?TransactionServiceProvider?assembleTransactionService(ConfigProperties?configProperties)?{RecoveryLog?recoveryLog?=null;//打印日志logProperties(configProperties.getCompletedProperties());//生成唯一名字String?tmUniqueName?=?configProperties.getTmUniqueName();long?maxTimeout?=?configProperties.getMaxTimeout();int?maxActives?=?configProperties.getMaxActives();boolean?threaded2pc?=?configProperties.getThreaded2pc();//SPI方式加載OltpLog?,這是最重要的擴(kuò)展地方,如果用戶(hù)沒(méi)有SPI的方式去擴(kuò)展那么就為nullOltpLog?oltpLog?=?createOltpLogFromClasspath();if?(oltpLog?==?null)?{LOGGER.logInfo("Using?default?(local)?logging?and?recovery...");//創(chuàng)建事務(wù)日志存儲(chǔ)資源Repository?repository?=?createRepository(configProperties);oltpLog?=?createOltpLog(repository);//????Assemble?recoveryLogrecoveryLog?=?createRecoveryLog(repository);}StateRecoveryManagerImp?recoveryManager?=?new?StateRecoveryManagerImp();recoveryManager.setOltpLog(oltpLog);//生成唯一id生成器,以后生成XID會(huì)用的到UniqueIdMgr?idMgr?=?new?UniqueIdMgr?(?tmUniqueName?);int?overflow?=?idMgr.getMaxIdLengthInBytes()?-?MAX_TID_LENGTH;if?(?overflow?>?0?)?{//?see?case?73086String?msg?=?"Value?too?long?:?"?+?tmUniqueName;LOGGER.logFatal?(?msg?);throw?new?SysException(msg);}return?new?TransactionServiceImp(tmUniqueName,?recoveryManager,?idMgr,?maxTimeout,?maxActives,?!threaded2pc,?recoveryLog);}
  • 我們重點(diǎn)來(lái)分析createOltpLogFromClasspath(), 采用SPI的加載方式來(lái)獲取,默認(rèn)這里會(huì)返回 null, 什么意思呢?就是當(dāng)沒(méi)有擴(kuò)展的時(shí)候,atomikos,會(huì)創(chuàng)建框架自定義的資源,來(lái)存儲(chǔ)事務(wù)日志。

private?OltpLog?createOltpLogFromClasspath()?{OltpLog?ret?=?null;ServiceLoader<OltpLogFactory>?loader?=?ServiceLoader.load(OltpLogFactory.class,Configuration.class.getClassLoader());int?i?=?0;for?(OltpLogFactory?l?:?loader?)?{ret?=?l.createOltpLog();i++;}if?(i?>?1)?{String?msg?=?"More?than?one?OltpLogFactory?found?in?classpath?-?error?in?configuration!";LOGGER.logFatal(msg);throw?new?SysException(msg);}return?ret;}
  • 我們跟著進(jìn)入 Repository repository = createRepository(configProperties);

?private?CachedRepository?createCoordinatorLogEntryRepository(ConfigProperties?configProperties)?throws?LogException?{//創(chuàng)建內(nèi)存資源存儲(chǔ)InMemoryRepository?inMemoryCoordinatorLogEntryRepository?=?new?InMemoryRepository();//進(jìn)行初始化inMemoryCoordinatorLogEntryRepository.init();//創(chuàng)建使用文件存儲(chǔ)資源作為backupFileSystemRepository?backupCoordinatorLogEntryRepository?=?new?FileSystemRepository();//進(jìn)行初始化backupCoordinatorLogEntryRepository.init();//內(nèi)存與file資源進(jìn)行合并CachedRepository?repository?=?new?CachedRepository(inMemoryCoordinatorLogEntryRepository,?backupCoordinatorLogEntryRepository);repository.init();return?repository;}
  • 這里就會(huì)創(chuàng)建出 CachedRepository,里面包含了 ?InMemoryRepository 與 FileSystemRepository

  • 回到主線(xiàn) com.atomikos.icatch.config.Configuration.init(), 最后來(lái)分析下notifyAfterInit();

?private?static?void?notifyAfterInit()?{//進(jìn)行插件的初始化for?(TransactionServicePlugin?p?:?tsListenersList_)?{p.afterInit();}for?(LogAdministrator?a?:?logAdministrators_)?{a.registerLogControl(service_.getLogControl());}//設(shè)置事務(wù)恢復(fù)服務(wù),進(jìn)行事務(wù)的恢復(fù)for?(RecoverableResource?r?:?resourceList_?)?{r.setRecoveryService(recoveryService_);}}
  • 插件的初始化會(huì)進(jìn)入com.atomikos.icatch.jta.JtaTransactionServicePlugin.afterInit()

?public?void?afterInit()?{TransactionManagerImp.installTransactionManager(Configuration.getCompositeTransactionManager(),?autoRegisterResources);//如果我們自定義擴(kuò)展了?OltpLog?,這里就會(huì)返回null,如果是null,那么XaResourceRecoveryManager就是nullRecoveryLog?recoveryLog?=?Configuration.getRecoveryLog();long?maxTimeout?=?Configuration.getConfigProperties().getMaxTimeout();if?(recoveryLog?!=?null)?{XaResourceRecoveryManager.installXaResourceRecoveryManager(new?DefaultXaRecoveryLog(recoveryLog,?maxTimeout),Configuration.getConfigProperties().getTmUniqueName());}}
  • 重點(diǎn)注意 RecoveryLog recoveryLog = Configuration.getRecoveryLog(); ,如果用戶(hù)采用SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog ,這里就會(huì)返回 null。如果是null,則不會(huì)對(duì) XaResourceRecoveryManager 進(jìn)行初始化。

  • 回到 notifyAfterInit(), 我們來(lái)分析 setRecoveryService。

public?void?setRecoveryService?(?RecoveryService?recoveryService?)throws?ResourceException{if?(?recoveryService?!=?null?)?{if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"+?getName?()?);this.branchIdentifier=recoveryService.getName();recover();}}
  • 我們進(jìn)入 recover() 方法:

?public?void?recover()?{XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();//null?for?LogCloud?recoveryif?(xaResourceRecoveryManager?!=?null)?{try?{xaResourceRecoveryManager.recover(getXAResource());}?catch?(Exception?e)?{refreshXAResource();?//cf?case?156968}}}
  • 看到最關(guān)鍵的注釋了嗎,如果用戶(hù)采用SPI的方式,擴(kuò)展了com.atomikos.recovery.OltpLog,那么XaResourceRecoveryManager 為null,則就會(huì)進(jìn)行云端恢復(fù),反之則進(jìn)行事務(wù)恢復(fù)。事務(wù)恢復(fù)很復(fù)雜,我們會(huì)單獨(dú)來(lái)講。

到這里atomikos的基本的初始化已經(jīng)完成。

atomikos事務(wù)begin流程

我們知道,本地的事務(wù),都會(huì)有一個(gè) trainsaction.begin, 對(duì)應(yīng)XA分布式事務(wù)來(lái)說(shuō)也不另外,我們?cè)侔阉悸非袚Q回XAShardingTransactionManager.begin(), 會(huì)調(diào)用com.atomikos.icatch.jta.TransactionManagerImp.begin()。流程圖如下:

代碼:

??public?void?begin?(?int?timeout?)?throws?NotSupportedException,SystemException{CompositeTransaction?ct?=?null;ResumePreviousTransactionSubTxAwareParticipant?resumeParticipant?=?null;ct?=?compositeTransactionManager.getCompositeTransaction();if?(?ct?!=?null?&&?ct.getProperty?(??JTA_PROPERTY_NAME?)?==?null?)?{LOGGER.logWarning?(?"JTA:?temporarily?suspending?incompatible?transaction:?"?+?ct.getTid()?+"?(will?be?resumed?after?JTA?transaction?ends)"?);ct?=?compositeTransactionManager.suspend();resumeParticipant?=?new?ResumePreviousTransactionSubTxAwareParticipant?(?ct?);}try?{//創(chuàng)建事務(wù)補(bǔ)償點(diǎn)ct?=?compositeTransactionManager.createCompositeTransaction?(?(?(?long?)?timeout?)?*?1000?);if?(?resumeParticipant?!=?null?)?ct.addSubTxAwareParticipant?(?resumeParticipant?);if?(?ct.isRoot?()?&&?getDefaultSerial?()?)ct.setSerial?();ct.setProperty?(?JTA_PROPERTY_NAME?,?"true"?);}?catch?(?SysException?se?)?{String?msg?=?"Error?in?begin()";LOGGER.logError(?msg?,?se?);throw?new?ExtendedSystemException?(?msg?,?se?);}recreateCompositeTransactionAsJtaTransaction(ct);}
  • 這里我們主要關(guān)注 compositeTransactionManager.createCompositeTransaction(),

public?CompositeTransaction?createCompositeTransaction?(?long?timeout?)?throws?SysException{CompositeTransaction?ct?=?null?,?ret?=?null;ct?=?getCurrentTx?();if?(?ct?==?null?)?{ret?=?getTransactionService().createCompositeTransaction?(?timeout?);if(LOGGER.isDebugEnabled()){LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?):?"+?"created?new?ROOT?transaction?with?id?"?+?ret.getTid?());}}?else?{if(LOGGER.isDebugEnabled())?LOGGER.logDebug("createCompositeTransaction?(?"?+?timeout?+?"?)");ret?=?ct.createSubTransaction?();}Thread?thread?=?Thread.currentThread?();setThreadMappings?(?ret,?thread?);return?ret;}
  • 創(chuàng)建了事務(wù)補(bǔ)償點(diǎn),然后把他放到了用當(dāng)前線(xiàn)程作為key的Map當(dāng)中,這里思考,為啥它不用 threadLocal。

到這里atomikos的事務(wù)begin流程已經(jīng)完成。大家可能有些疑惑,begin好像什么都沒(méi)有做,XA start 也沒(méi)調(diào)用?別慌,下一節(jié)繼續(xù)來(lái)講。

XATransactionDataSource getConnection() 流程

我們都知道想要執(zhí)行SQL語(yǔ)句,必須要獲取到數(shù)據(jù)庫(kù)的connection。讓我們?cè)倩氐?XAShardingTransactionManager.getConnection() 最后會(huì)調(diào)用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()。流程圖如下:

代碼 :

?public?Connection?getConnection()?throws?SQLException,?SystemException,?RollbackException?{//先檢查是否已經(jīng)有存在的connection,這一步很關(guān)心,也是XA的關(guān)鍵,因?yàn)閄A事務(wù),必須在同一個(gè)connectionif?(CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName()))?{return?dataSource.getConnection();}//獲取數(shù)據(jù)庫(kù)連接Connection?result?=?dataSource.getConnection();//轉(zhuǎn)成XAConnection,其實(shí)是同一個(gè)連接X(jué)AConnection?xaConnection?=?XAConnectionFactory.createXAConnection(databaseType,?xaDataSource,?result);//獲取JTA事務(wù)定義接口Transaction?transaction?=?xaTransactionManager.getTransactionManager().getTransaction();if?(!enlistedTransactions.get().contains(transaction))?{//進(jìn)行資源注冊(cè)transaction.enlistResource(new?SingleXAResource(resourceName,?xaConnection.getXAResource()));transaction.registerSynchronization(new?Synchronization()?{@Overridepublic?void?beforeCompletion()?{enlistedTransactions.get().remove(transaction);}@Overridepublic?void?afterCompletion(final?int?status)?{enlistedTransactions.get().clear();}});enlistedTransactions.get().add(transaction);}return?result;}
  • 首先第一步很關(guān)心,尤其是對(duì)shardingsphere來(lái)說(shuō),因?yàn)樵谝粋€(gè)事務(wù)里面,會(huì)有多個(gè)SQL語(yǔ)句,打到相同的數(shù)據(jù)庫(kù),所以對(duì)相同的數(shù)據(jù)庫(kù),必須獲取同一個(gè)XAConnection,這樣才能進(jìn)行XA事務(wù)的提交與回滾。

  • 我們接下來(lái)關(guān)心 transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 會(huì)進(jìn)入com.atomikos.icatch.jta.TransactionImp.enlistResource(), 代碼太長(zhǎng),截取一部分。

try?{restx?=?(XAResourceTransaction)?res.getResourceTransaction(this.compositeTransaction);//?next,?we?MUST?set?the?xa?resource?again,//?because?ONLY?the?instance?we?got?as?argument//?is?available?for?use?now?!//?older?instances?(set?in?restx?from?previous?sibling)//?have?connections?that?may?be?in?reuse?already//?->old?xares?not?valid?except?for?2pc?operationsrestx.setXAResource(xares);restx.resume();}?catch?(ResourceException?re)?{throw?new?ExtendedSystemException("Unexpected?error?during?enlist",?re);}?catch?(RuntimeException?e)?{throw?e;}addXAResourceTransaction(restx,?xares);
  • 我們直接看 restx.resume();

public?synchronized?void?resume()?throws?ResourceException?{int?flag?=?0;String?logFlag?=?"";if?(this.state.equals(TxState.LOCALLY_DONE))?{//?reused?instanceflag?=?XAResource.TMJOIN;logFlag?=?"XAResource.TMJOIN";}?else?if?(!this.knownInResource)?{//?new?instanceflag?=?XAResource.TMNOFLAGS;logFlag?=?"XAResource.TMNOFLAGS";}?elsethrow?new?IllegalStateException("Wrong?state?for?resume:?"+?this.state);try?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.start?(?"?+?this.xidToHexString+?"?,?"?+?logFlag?+?"?)?on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}this.xaresource.start(this.xid,?flag);}?catch?(XAException?xaerr)?{String?msg?=?interpretErrorCode(this.resourcename,?"resume",this.xid,?xaerr.errorCode);LOGGER.logWarning(msg,?xaerr);throw?new?ResourceException(msg,?xaerr);}setState(TxState.ACTIVE);this.knownInResource?=?true;}
  • 哦多尅,看見(jiàn)了嗎,各位,看見(jiàn)了 this.xaresource.start(this.xid, flag); 了嗎????,我們進(jìn)去,假設(shè)我們使用的Mysql數(shù)據(jù)庫(kù):

?public?void?start(Xid?xid,?int?flags)?throws?XAException?{StringBuilder?commandBuf?=?new?StringBuilder(300);commandBuf.append("XA?START?");appendXid(commandBuf,?xid);switch(flags)?{case?0:break;case?2097152:commandBuf.append("?JOIN");break;case?134217728:commandBuf.append("?RESUME");break;default:throw?new?XAException(-5);}this.dispatchCommand(commandBuf.toString());this.underlyingConnection.setInGlobalTx(true);}
  • 組裝XA start Xid SQL語(yǔ)句,進(jìn)行執(zhí)行。

到這里,我們總結(jié)下,在獲取數(shù)據(jù)庫(kù)連接的時(shí)候,我們執(zhí)行了XA協(xié)議接口中的 XA start xid

atomikos事務(wù)commit流程

好了,上面我們已經(jīng)開(kāi)啟了事務(wù),現(xiàn)在我們來(lái)分析下事務(wù)commit流程,我們?cè)侔岩暯乔袚Q回XAShardingTransactionManager.commit(),最后我們會(huì)進(jìn)入com.atomikos.icatch.imp.CompositeTransactionImp.commit() 方法。流程圖如下:

代碼:

?public?void?commit?()?throws?HeurRollbackException,?HeurMixedException,HeurHazardException,?SysException,?SecurityException,RollbackException{//首先更新下事務(wù)日志的狀態(tài)doCommit?();setSiblingInfoForIncoming1pcRequestFromRemoteClient();if?(?isRoot?()?)?{//真正的commit操作coordinator.terminate?(?true?);}}
  • 我們關(guān)注coordinator.terminate ( true );

?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,HeurMixedException,?SysException,?java.lang.SecurityException,HeurCommitException,?HeurHazardException,?RollbackException,IllegalStateException{synchronized?(?fsm_?)?{if?(?commit?)?{//判斷有幾個(gè)參與者,如果只有一個(gè),直接提交if?(?participants_.size?()?<=?1?)?{commit?(?true?);}?else?{//否則,走XA?2階段提交流程,先prepare,?再提交int?prepareResult?=?prepare?();//?make?sure?to?only?do?commit?if?NOT?read?onlyif?(?prepareResult?!=?Participant.READ_ONLY?)commit?(?false?);}}?else?{rollback?();}}}
  • 首先會(huì)判斷參與者的個(gè)數(shù),這里我們可以理解為MySQL的database數(shù)量,如果只有一個(gè),退化成一階段,直接提交。如果有多個(gè),則走標(biāo)準(zhǔn)的XA二階段提交流程。

  • 我們來(lái)看 prepare (); 流程,最后會(huì)走到com.atomikos.icatch.imp.PrepareMessage.send() ---> com.atomikos.datasource.xa.XAResourceTransaction.prepare()

int?ret?=?0;terminateInResource();if?(TxState.ACTIVE?==?this.state)?{//?tolerate?non-delisting?apps/serverssuspend();}//?duplicate?prepares?can?happen?for?siblings?in?serial?subtxs!!!//?in?that?case,?the?second?prepare?just?returns?READONLYif?(this.state?==?TxState.IN_DOUBT)return?Participant.READ_ONLY;else?if?(!(this.state?==?TxState.LOCALLY_DONE))throw?new?SysException("Wrong?state?for?prepare:?"?+?this.state);try?{//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after//?suspend???testOrRefreshXAResourceFor2PC();if?(LOGGER.isTraceEnabled())?{LOGGER.logTrace("About?to?call?prepare?on?XAResource?instance:?"+?this.xaresource);}ret?=?this.xaresource.prepare(this.xid);}?catch?(XAException?xaerr)?{String?msg?=?interpretErrorCode(this.resourcename,?"prepare",this.xid,?xaerr.errorCode);if?(XAException.XA_RBBASE?<=?xaerr.errorCode&&?xaerr.errorCode?<=?XAException.XA_RBEND)?{LOGGER.logWarning(msg,?xaerr);?//?see?case?84253throw?new?RollbackException(msg);}?else?{LOGGER.logError(msg,?xaerr);throw?new?SysException(msg,?xaerr);}}setState(TxState.IN_DOUBT);if?(ret?==?XAResource.XA_RDONLY)?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString+?"?)?returning?XAResource.XA_RDONLY?"?+?"on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}return?Participant.READ_ONLY;}?else?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.prepare?(?"?+?this.xidToHexString+?"?)?returning?OK?"?+?"on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}return?Participant.READ_ONLY?+?1;}
  • 終于,我們看到了這么一句 ret = this.xaresource.prepare(this.xid); 但是等等,我們之前不是說(shuō)了,XA start xid 以后要先 XA end xid 嗎?答案就在 suspend(); 里面。

public?synchronized?void?suspend()?throws?ResourceException?{//?BugzID:?20545//?State?may?be?IN_DOUBT?or?TERMINATED?when?a?connection?is?closed?AFTER//?commit!//?In?that?case,?don't?call?END?again,?and?also?don't?generate?any//?error!//?This?is?required?for?some?hibernate?connection?release?strategies.if?(this.state.equals(TxState.ACTIVE))?{try?{if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.end?(?"?+?this.xidToHexString+?"?,?XAResource.TMSUCCESS?)?on?resource?"+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}//執(zhí)行了?xa?end?語(yǔ)句this.xaresource.end(this.xid,?XAResource.TMSUCCESS);}?catch?(XAException?xaerr)?{String?msg?=?interpretErrorCode(this.resourcename,?"end",this.xid,?xaerr.errorCode);if?(LOGGER.isTraceEnabled())LOGGER.logTrace(msg,?xaerr);//?don't?throw:?fix?for?case?102827}setState(TxState.LOCALLY_DONE);}}

到了這里,我們已經(jīng)執(zhí)行了 XA start xid -> XA end xid --> XA prepare xid, 接下來(lái)就是最后一步 commit

  • 我們?cè)倩氐?terminate(false) 方法,來(lái)看 commit()流程。其實(shí)和 prepare流程一樣,最后會(huì)走到 com.atomikos.datasource.xa.XAResourceTransaction.commit()。commit執(zhí)行完,數(shù)據(jù)提交

//繁雜代碼過(guò)多,就顯示核心的 this.xaresource.commit(this.xid,?onePhase);

思考:這里的參與者提交是在一個(gè)循環(huán)里面,一個(gè)一個(gè)提交的,如果之前的提交了,后面的參與者提交的時(shí)候,掛了,就會(huì)造成數(shù)據(jù)的不一致性。

Atomikos rollback() 流程

上面我們已經(jīng)分析了commit流程,其實(shí)rollback流程和commit流程一樣,我們?cè)诎涯抗馇袚Q回 org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback() ,最后會(huì)執(zhí)行到com.atomikos.icatch.imp.CompositeTransactionImp.rollback()。

????public?void?rollback?()?throws?IllegalStateException,?SysException{//清空資源,更新事務(wù)日志狀態(tài)等doRollback?();if?(?isRoot?()?)?{try?{coordinator.terminate?(?false?);}?catch?(?Exception?e?)?{throw?new?SysException?(?"Unexpected?error?in?rollback:?"?+?e.getMessage?(),?e?);}}}
  • 重點(diǎn)關(guān)注 coordinator.terminate ( false ); ,這個(gè)和 commit流程是一樣的,只不過(guò)在 commit流程里面,參數(shù)傳的是true。

?protected?void?terminate?(?boolean?commit?)?throws?HeurRollbackException,HeurMixedException,?SysException,?java.lang.SecurityException,HeurCommitException,?HeurHazardException,?RollbackException,IllegalStateException{synchronized?(?fsm_?)?{if?(?commit?)?{if?(?participants_.size?()?<=?1?)?{commit?(?true?);}?else?{int?prepareResult?=?prepare?();//?make?sure?to?only?do?commit?if?NOT?read?onlyif?(?prepareResult?!=?Participant.READ_ONLY?)commit?(?false?);}}?else?{//如果是false,走的是rollbackrollback?();}}}
  • 我們重點(diǎn)關(guān)注 rollback() ,最后會(huì)走到com.atomikos.datasource.xa.XAResourceTransaction.rollback()。

public?synchronized?void?rollback()throws?HeurCommitException,?HeurMixedException,HeurHazardException,?SysException?{terminateInResource();if?(rollbackShouldDoNothing())?{return;}if?(this.state.equals(TxState.TERMINATED))?{return;}if?(this.state.equals(TxState.HEUR_MIXED))throw?new?HeurMixedException();if?(this.state.equals(TxState.HEUR_COMMITTED))throw?new?HeurCommitException();if?(this.xaresource?==?null)?{throw?new?HeurHazardException("XAResourceTransaction?"+?getXid()?+?":?no?XAResource?to?rollback?");}try?{if?(this.state.equals(TxState.ACTIVE))?{?//?first?suspend?xidsuspend();}//?refresh?xaresource?for?MQSeries:?seems?to?close?XAResource?after//?suspend???testOrRefreshXAResourceFor2PC();if?(LOGGER.isDebugEnabled())?{LOGGER.logDebug("XAResource.rollback?(?"?+?this.xidToHexString+?"?)?"?+?"on?resource?"?+?this.resourcename+?"?represented?by?XAResource?instance?"+?this.xaresource);}this.xaresource.rollback(this.xid);

先在supend()方法里面執(zhí)行了 XA end xid 語(yǔ)句, 接下來(lái)執(zhí)行 this.xaresource.rollback(this.xid); 進(jìn)行數(shù)據(jù)的回滾。

Atomikos-recover 流程

說(shuō)事務(wù)恢復(fù)流程之前,我們來(lái)討論下,會(huì)啥會(huì)出現(xiàn)事務(wù)恢復(fù)?XA二階段提交協(xié)議不是強(qiáng)一致性的嗎?要解答這個(gè)問(wèn)題,我們就要來(lái)看看XA二階段協(xié)議有什么問(wèn)題?

問(wèn)題一 :單點(diǎn)故障

由于協(xié)調(diào)者的重要性,一旦協(xié)調(diào)者TM發(fā)生故障。參與者RM會(huì)一直阻塞下去。尤其在第二階段,協(xié)調(diào)者發(fā)生故障,那么所有的參與者還都處于鎖定事務(wù)資源的狀態(tài)中,而無(wú)法繼續(xù)完成事務(wù)操作。(如果是協(xié)調(diào)者掛掉,可以重新選舉一個(gè)協(xié)調(diào)者,但是無(wú)法解決因?yàn)閰f(xié)調(diào)者宕機(jī)導(dǎo)致的參與者處于阻塞狀態(tài)的問(wèn)題)

問(wèn)題二 :數(shù)據(jù)不一致

數(shù)據(jù)不一致。在二階段提交的階段二中,當(dāng)協(xié)調(diào)者向參與者發(fā)送commit請(qǐng)求之后,發(fā)生了局部網(wǎng)絡(luò)異常或者在發(fā)送commit請(qǐng)求過(guò)程中協(xié)調(diào)者發(fā)生了故障,這會(huì)導(dǎo)致只有一部分參與者接受到了commit請(qǐng)求。而在這部分參與者接到commit請(qǐng)求之后就會(huì)執(zhí)行commit操作。但是其他部分未接到commit請(qǐng)求的機(jī)器則無(wú)法執(zhí)行事務(wù)提交。于是整個(gè)分布式系統(tǒng)便出現(xiàn)了數(shù)據(jù)不一致性的現(xiàn)象。

如何解決?

解決的方案簡(jiǎn)單,就是我們?cè)谑聞?wù)的操作的每一步,我們都需要對(duì)事務(wù)狀態(tài)的日志進(jìn)行人為的記錄,我們可以把日志記錄存儲(chǔ)在我們想存儲(chǔ)的地方,可以是本地存儲(chǔ),也可以中心化的存儲(chǔ)。atomikos的開(kāi)源版本,我們之前也分析了,它是使用內(nèi)存 + file的方式,存儲(chǔ)在本地,這樣的話(huà),如果在一個(gè)集群系統(tǒng)里面,如果有節(jié)點(diǎn)宕機(jī),日志又存儲(chǔ)在本地,所以事務(wù)不能及時(shí)的恢復(fù)(需要重啟服務(wù))。

Atomikos 多場(chǎng)景下事務(wù)恢復(fù)。

Atomikos 提供了二種方式,來(lái)應(yīng)對(duì)不同場(chǎng)景下的異常情況。

  • 場(chǎng)景一:服務(wù)節(jié)點(diǎn)不宕機(jī),因?yàn)槠渌脑?#xff0c;產(chǎn)生需要事務(wù)恢復(fù)的情況。這個(gè)時(shí)候才要定時(shí)任務(wù)進(jìn)行恢復(fù)。具體的代碼 com.atomikos.icatch.imp.TransactionServiceImp.init() 方法,會(huì)初始化一個(gè)定時(shí)任務(wù),進(jìn)行事務(wù)的恢復(fù)。

public?synchronized?void?init?(?Properties?properties?)?throws?SysException{shutdownInProgress_?=?false;control_?=?new?com.atomikos.icatch.admin.imp.LogControlImp?(?(AdminLog)?this.recoveryLog?);ConfigProperties?configProperties?=?new?ConfigProperties(properties);long?recoveryDelay?=?configProperties.getRecoveryDelay();recoveryTimer?=?new?PooledAlarmTimer(recoveryDelay);recoveryTimer.addAlarmTimerListener(new?AlarmTimerListener()?{@Overridepublic?void?alarm(AlarmTimer?timer)?{//進(jìn)行事務(wù)恢復(fù)performRecovery();}});TaskManager.SINGLETON.executeTask(recoveryTimer);initialized_?=?true;}
  • 最終會(huì)進(jìn)入com.atomikos.datasource.xa.XATransactionalResource.recover() 方法。

???public?void?recover()?{XaResourceRecoveryManager?xaResourceRecoveryManager?=?XaResourceRecoveryManager.getInstance();if?(xaResourceRecoveryManager?!=?null)?{?//null?for?LogCloud?recoverytry?{xaResourceRecoveryManager.recover(getXAResource());}?catch?(Exception?e)?{refreshXAResource();?//cf?case?156968}}}
  • 場(chǎng)景二: 當(dāng)服務(wù)節(jié)點(diǎn)宕機(jī)重啟動(dòng)過(guò)程中進(jìn)行事務(wù)的恢復(fù)。具體實(shí)現(xiàn)在com.atomikos.datasource.xa.XATransactionalResource.setRecoveryService()方法里面

?@Overridepublic?void?setRecoveryService?(?RecoveryService?recoveryService?)throws?ResourceException{if?(?recoveryService?!=?null?)?{if?(?LOGGER.isTraceEnabled()?)?LOGGER.logTrace?(?"Installing?recovery?service?on?resource?"+?getName?()?);this.branchIdentifier=recoveryService.getName();//進(jìn)行事務(wù)恢復(fù)recover();}}

com.atomikos.datasource.xa.XATransactionalResource.recover() 流程詳解。


主代碼:

?public?void?recover(XAResource?xaResource)?throws?XAException?{//?根據(jù)XA?recovery?協(xié)議獲取?xidList<XID>?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);Collection<XID>?xidsToCommit;try?{//?xid?與日志記錄的xid進(jìn)行匹配xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();for?(XID?xid?:?xidsToRecover)?{if?(xidsToCommit.contains(xid))?{//執(zhí)行?XA?commit?xid?進(jìn)行提交replayCommit(xid,?xaResource);}?else?{attemptPresumedAbort(xid,?xaResource);}}}?catch?(LogException?couldNotRetrieveCommittingXids)?{LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);}}
  • 我們來(lái)看一下如何根據(jù) XA recovery 協(xié)議獲取RM端存儲(chǔ)的xid。進(jìn)入方法 retrievePreparedXidsFromXaResource(xaResource), 最后進(jìn)入 com.atomikos.datasource.xa.RecoveryScan.recoverXids()方法。

public?static?List<XID>?recoverXids(XAResource?xaResource,?XidSelector?selector)?throws?XAException?{List<XID>?ret?=?new?ArrayList<XID>();boolean?done?=?false;int?flags?=?XAResource.TMSTARTRSCAN;Xid[]?xidsFromLastScan?=?null;List<XID>?allRecoveredXidsSoFar?=?new?ArrayList<XID>();do?{xidsFromLastScan?=?xaResource.recover(flags);flags?=?XAResource.TMNOFLAGS;done?=?(xidsFromLastScan?==?null?||?xidsFromLastScan.length?==?0);if?(!done)?{//?TEMPTATIVELY?SET?done?TO?TRUE//?TO?TOLERATE?ORACLE?8.1.7?INFINITE//?LOOP?(ALWAYS?RETURNS?SAME?RECOVER//?SET).?IF?A?NEW?SET?OF?XIDS?IS?RETURNED//?THEN?done?WILL?BE?RESET?TO?FALSEdone?=?true;for?(?int?i?=?0;?i?<?xidsFromLastScan.length;?i++?)?{XID?xid?=?new?XID?(?xidsFromLastScan[i]?);//?our?own?XID?implements?equals?and?hashCode?properlyif?(!allRecoveredXidsSoFar.contains(xid))?{//?a?new?xid?is?returned?->?we?can?not?be?in?a?recovery?loop?->?go?onallRecoveredXidsSoFar.add(xid);done?=?false;if?(selector.selects(xid))?{ret.add(xid);}}}}}?while?(!done);return?ret;}
  • 我們重點(diǎn)關(guān)注xidsFromLastScan = xaResource.recover(flags); 這個(gè)方法,如果我們使用MySQL,那么就會(huì)進(jìn)入 MysqlXAConnection.recover()方法。執(zhí)行 XA recovery xid 語(yǔ)句來(lái)獲取 xid。

?protected?static?Xid[]?recover(Connection?c,?int?flag)?throws?XAException?{/**?The?XA?RECOVER?statement?returns?information?for?those?XA?transactions?on?the?MySQL?server?that?are?in?the?PREPARED?state.?(See?Section?13.4.7.2,????XA*?Transaction?States???.)?The?output?includes?a?row?for?each?such?XA?transaction?on?the?server,?regardless?of?which?client?started?it.**?XA?RECOVER?output?rows?look?like?this?(for?an?example?xid?value?consisting?of?the?parts?'abc',?'def',?and?7):**?mysql>?XA?RECOVER;*?+----------+--------------+--------------+--------+*?|?formatID?|?gtrid_length?|?bqual_length?|?data?|*?+----------+--------------+--------------+--------+*?|?7?|?3?|?3?|?abcdef?|*?+----------+--------------+--------------+--------+**?The?output?columns?have?the?following?meanings:**?formatID?is?the?formatID?part?of?the?transaction?xid*?gtrid_length?is?the?length?in?bytes?of?the?gtrid?part?of?the?xid*?bqual_length?is?the?length?in?bytes?of?the?bqual?part?of?the?xid*?data?is?the?concatenation?of?the?gtrid?and?bqual?parts?of?the?xid*/boolean?startRscan?=?((flag?&?TMSTARTRSCAN)?>?0);boolean?endRscan?=?((flag?&?TMENDRSCAN)?>?0);if?(!startRscan?&&?!endRscan?&&?flag?!=?TMNOFLAGS)?{throw?new?MysqlXAException(XAException.XAER_INVAL,?Messages.getString("MysqlXAConnection.001"),?null);}////?We?return?all?recovered?XIDs?at?once,?so?if?not??TMSTARTRSCAN,?return?no?new?XIDs////?We?don't?attempt?to?maintain?state?to?check?for?TMNOFLAGS?"outside"?of?a?scan//if?(!startRscan)?{return?new?Xid[0];}ResultSet?rs?=?null;Statement?stmt?=?null;List<MysqlXid>?recoveredXidList?=?new?ArrayList<MysqlXid>();try?{//?TODO:?Cache?this?for?lifetime?of?XAConnectionstmt?=?c.createStatement();rs?=?stmt.executeQuery("XA?RECOVER");while?(rs.next())?{final?int?formatId?=?rs.getInt(1);int?gtridLength?=?rs.getInt(2);int?bqualLength?=?rs.getInt(3);byte[]?gtridAndBqual?=?rs.getBytes(4);final?byte[]?gtrid?=?new?byte[gtridLength];final?byte[]?bqual?=?new?byte[bqualLength];if?(gtridAndBqual.length?!=?(gtridLength?+?bqualLength))?{throw?new?MysqlXAException(XAException.XA_RBPROTO,?Messages.getString("MysqlXAConnection.002"),?null);}System.arraycopy(gtridAndBqual,?0,?gtrid,?0,?gtridLength);System.arraycopy(gtridAndBqual,?gtridLength,?bqual,?0,?bqualLength);recoveredXidList.add(new?MysqlXid(gtrid,?bqual,?formatId));}}?catch?(SQLException?sqlEx)?{throw?mapXAExceptionFromSQLException(sqlEx);}?finally?{if?(rs?!=?null)?{try?{rs.close();}?catch?(SQLException?sqlEx)?{throw?mapXAExceptionFromSQLException(sqlEx);}}if?(stmt?!=?null)?{try?{stmt.close();}?catch?(SQLException?sqlEx)?{throw?mapXAExceptionFromSQLException(sqlEx);}}}int?numXids?=?recoveredXidList.size();Xid[]?asXids?=?new?Xid[numXids];Object[]?asObjects?=?recoveredXidList.toArray();for?(int?i?=?0;?i?<?numXids;?i++)?{asXids[i]?=?(Xid)?asObjects[i];}return?asXids;}
  • 這里要注意如果Mysql的版本 <5.7.7 ,則不會(huì)有任何數(shù)據(jù),在以后的版本中Mysql進(jìn)行了修復(fù),因此如果我們想要使用MySQL充當(dāng)RM,版本必須 >= 5.7.7 ,原因是:

MySQL 5.6版本在客戶(hù)端退出的時(shí)候,自動(dòng)把已經(jīng)prepare的事務(wù)回滾了,那么MySQL為什么要這樣做?這主要取決于MySQL的內(nèi)部實(shí)現(xiàn),MySQL 5.7以前的版本,對(duì)于prepare的事務(wù),MySQL是不會(huì)記錄binlog的(官方說(shuō)是減少fsync,起到了優(yōu)化的作用)。只有當(dāng)分布式事務(wù)提交的時(shí)候才會(huì)把前面的操作寫(xiě)入binlog信息,所以對(duì)于binlog來(lái)說(shuō),分布式事務(wù)與普通的事務(wù)沒(méi)有區(qū)別,而prepare以前的操作信息都保存在連接的IO_CACHE中,如果這個(gè)時(shí)候客戶(hù)端退出了,以前的binlog信息都會(huì)被丟失,再次重連后允許提交的話(huà),會(huì)造成Binlog丟失,從而造成主從數(shù)據(jù)的不一致,所以官方在客戶(hù)端退出的時(shí)候直接把已經(jīng)prepare的事務(wù)都回滾了!

  • 回到主線(xiàn)再?gòu)淖约河涗浀氖聞?wù)日志里面獲取XID

??Collection<XID>?xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();
  • 我們來(lái)看下獲取事務(wù)日志里面的XID的retrieveExpiredCommittingXidsFromLog()方法。然后進(jìn)入com.atomikos.recovery.imp.RecoveryLogImp.getCommittingParticipants()方法。

public?Collection<ParticipantLogEntry>?getCommittingParticipants()throws?LogReadException?{Collection<ParticipantLogEntry>?committingParticipants?=?new?HashSet<ParticipantLogEntry>();Collection<CoordinatorLogEntry>?committingCoordinatorLogEntries?=?repository.findAllCommittingCoordinatorLogEntries();for?(CoordinatorLogEntry?coordinatorLogEntry?:?committingCoordinatorLogEntries)?{for?(ParticipantLogEntry?participantLogEntry?:?coordinatorLogEntry.participants)?{committingParticipants.add(participantLogEntry);}}return?committingParticipants;}

到這里我們來(lái)簡(jiǎn)單介紹一下,事務(wù)日志的存儲(chǔ)結(jié)構(gòu)。首先是 CoordinatorLogEntry,這是一次XA事務(wù)的所有信息實(shí)體類(lèi)。

public?class?CoordinatorLogEntry?implements?Serializable?{//全局事務(wù)idpublic?final?String?id;//是否已經(jīng)提交public?final?boolean?wasCommitted;/***?Only?for?subtransactions,?null?otherwise.*/public?final?String?superiorCoordinatorId;//參與者集合public?final?ParticipantLogEntry[]?participants; }
  • 再來(lái)看一下參與者實(shí)體類(lèi) ParticipantLogEntry :

public?class?ParticipantLogEntry?implements?Serializable?{private?static?final?long?serialVersionUID?=?1728296701394899871L;/***?The?ID?of?the?global?transaction?as?known?by?the?transaction?core.*/public?final?String?coordinatorId;/***?Identifies?the?participant?within?the?global?transaction.*/public?final?String?uri;/***?When?does?this?participant?expire?(expressed?in?millis?since?Jan?1,?1970)?*/public?final?long?expires;/***?Best-known?state?of?the?participant.*/public?final?TxState?state;/***?For?diagnostic?purposes,?null?if?not?relevant.*/public?final?String?resourceName; }
  • 回到com.atomikos.recovery.xa.DefaultXaRecoveryLog.getExpiredCommittingXids() 方法,可以到獲取了一次XA事務(wù)過(guò)程中,存儲(chǔ)的事務(wù)日志中的xid。

public?Set<XID>?getExpiredCommittingXids()?throws?LogReadException?{Set<XID>?ret?=?new?HashSet<XID>();Collection<ParticipantLogEntry>?entries?=?log.getCommittingParticipants();for?(ParticipantLogEntry?entry?:?entries)?{if?(expired(entry)?&&?!http(entry))?{XID?xid?=?new?XID(entry.coordinatorId,?entry.uri);ret.add(xid);}}return?ret;}
  • 如果從RM中通過(guò)XA recovery取出的XID,包含在從事務(wù)日志中取出的XID,則進(jìn)行commit,否則進(jìn)行rollback.

List<XID>?xidsToRecover?=?retrievePreparedXidsFromXaResource(xaResource);Collection<XID>?xidsToCommit;try?{xidsToCommit?=?retrieveExpiredCommittingXidsFromLog();for?(XID?xid?:?xidsToRecover)?{if?(xidsToCommit.contains(xid))?{replayCommit(xid,?xaResource);}?else?{attemptPresumedAbort(xid,?xaResource);}}}?catch?(LogException?couldNotRetrieveCommittingXids)?{LOGGER.logWarning("Transient?error?while?recovering?-?will?retry?later...",?couldNotRetrieveCommittingXids);}
  • replayCommit 方法如下:

private?void?replayCommit(XID?xid,?XAResource?xaResource)?{if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Replaying?commit?of?xid:?"?+?xid);try?{//進(jìn)行事務(wù)提交xaResource.commit(xid,?false);//更新事務(wù)日志log.terminated(xid);}?catch?(XAException?e)?{if?(alreadyHeuristicallyTerminatedByResource(e))?{handleHeuristicTerminationByResource(xid,?xaResource,?e,?true);}?else?if?(xidTerminatedInResourceByConcurrentCommit(e))?{log.terminated(xid);}?else?{LOGGER.logWarning("Transient?error?while?replaying?commit?-?will?retry?later...",?e);}}}
  • attemptPresumedAbort(xid, xaResource); 方法如下:

private?void?attemptPresumedAbort(XID?xid,?XAResource?xaResource)?{try?{log.presumedAborting(xid);if?(LOGGER.isDebugEnabled())?LOGGER.logDebug("Presumed?abort?of?xid:?"?+?xid);try?{//進(jìn)行回滾xaResource.rollback(xid);//更新日志狀態(tài)log.terminated(xid);}?catch?(XAException?e)?{if?(alreadyHeuristicallyTerminatedByResource(e))?{handleHeuristicTerminationByResource(xid,?xaResource,?e,?false);}?else?if?(xidTerminatedInResourceByConcurrentRollback(e))?{log.terminated(xid);}?else?{LOGGER.logWarning("Unexpected?exception?during?recovery?-?ignoring?to?retry?later...",?e);}}}?catch?(IllegalStateException?presumedAbortNotAllowedInCurrentLogState)?{//?ignore?to?retry?later?if?necessary}?catch?(LogException?logWriteException)?{LOGGER.logWarning("log?write?failed?for?Xid:?"+xid+",?ignoring?to?retry?later",?logWriteException);}}

總結(jié)

文章到此,已經(jīng)寫(xiě)的很長(zhǎng)很多了,我們分析了ShardingSphere對(duì)于XA方案,提供了一套SPI解決方案,對(duì)Atomikos進(jìn)行了整合,也分析了Atomikos初始化流程,開(kāi)始事務(wù)流程,獲取連接流程,提交事務(wù)流程,回滾事務(wù)流程,事務(wù)恢復(fù)流程。希望對(duì)大家理解XA的原理有所幫助。

加入我們

Apache ShardingSphere 一直踐行Apache Way的開(kāi)源之道,社區(qū)完全開(kāi)放與平等,人人享受開(kāi)源帶來(lái)的快樂(lè)。

地址: https://github.com/apache/shardingsphere

作者介紹:肖宇,Apache ShardingSphere Committer,開(kāi)源hmily分布式事務(wù)框架作者, 開(kāi)源soul網(wǎng)關(guān)作者,熱愛(ài)開(kāi)源,追求寫(xiě)優(yōu)雅代碼。目前就職入京東數(shù)科,參與ShardingSphere的開(kāi)源建設(shè),以及分布式數(shù)據(jù)庫(kù)的研發(fā)工作。

想知道更多?描下面的二維碼關(guān)注我

后臺(tái)回復(fù)"技術(shù)",加入技術(shù)群

【精彩推薦】

  • 原創(chuàng)|OpenAPI標(biāo)準(zhǔn)規(guī)范

  • 如此簡(jiǎn)單| ES最全詳細(xì)使用教程

  • ClickHouse到底是什么?為什么如此牛逼!

  • 原來(lái)ElasticSearch還可以這么理解

  • 面試官:InnoDB中一棵B+樹(shù)可以存放多少行數(shù)據(jù)?

  • 微服務(wù)下如何解耦?對(duì)于已經(jīng)緊耦合下如何重構(gòu)?

  • 如何構(gòu)建一套高性能、高可用、低成本的視頻處理系統(tǒng)?

  • 架構(gòu)之道:分離業(yè)務(wù)邏輯和技術(shù)細(xì)節(jié)

  • 星巴克不使用兩階段提交

點(diǎn)個(gè)贊+在看,少個(gè) bug?????

總結(jié)

以上是生活随笔為你收集整理的深度剖析Apache Shardingsphere对分布式事务的支持的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。