事务回滚什么意思 try_分布式事务 TCC-Transaction 源码分析——事务恢复
1. 概述
本文分享 TCC 恢復(fù)。主要涉及如下二個(gè) package 路徑下的類(lèi):
- org.mengyun.tcctransaction.recover
- RecoverConfig,事務(wù)恢復(fù)配置接口
- TransactionRecovery,事務(wù)恢復(fù)邏輯
- org.mengyun.tcctransaction.spring.recover :
- DefaultRecoverConfig,默認(rèn)事務(wù)恢復(fù)配置實(shí)現(xiàn)
- RecoverScheduledJob,事務(wù)恢復(fù)定時(shí)任務(wù)
本文涉及到的類(lèi)關(guān)系如下圖( 打開(kāi)大圖 ):
在《TCC-Transaction 源碼分析 —— 事務(wù)存儲(chǔ)器》中,事務(wù)信息被持久化到外部的存儲(chǔ)器中。事務(wù)存儲(chǔ)是事務(wù)恢復(fù)的基礎(chǔ)。通過(guò)讀取外部存儲(chǔ)器中的異常事務(wù),定時(shí)任務(wù)會(huì)按照一定頻率對(duì)事務(wù)進(jìn)行重試,直到事務(wù)完成或超過(guò)最大重試次數(shù)。
你行好事會(huì)因?yàn)榈玫劫澷p而愉悅同理,開(kāi)源項(xiàng)目貢獻(xiàn)者會(huì)因?yàn)?Star 而更加有動(dòng)力為 TCC-Transaction 點(diǎn)贊!傳送門(mén)ps:筆者假設(shè)你已經(jīng)閱讀過(guò)《tcc-transaction 官方文檔 —— 使用指南1.2.x》。
2. 事務(wù)重試配置
org.mengyun.tcctransaction.recover.RecoverConfig,事務(wù)恢復(fù)配置接口,實(shí)現(xiàn)代碼如下:
public interface RecoverConfig {
/**
* @return 最大重試次數(shù)
*/
int getMaxRetryCount();
/**
* @return 恢復(fù)間隔時(shí)間,單位:秒
*/
int getRecoverDuration();
/**
* @return cron 表達(dá)式
*/
String getCronExpression();
/**
* @return 延遲取消異常集合
*/
Set> getDelayCancelExceptions();
/**
* 設(shè)置延遲取消異常集合
*
* @param delayRecoverExceptions 延遲取消異常集合
*/
void setDelayCancelExceptions(Set> delayRecoverExceptions);
}
- #getMaxRetryCount(),單個(gè)事務(wù)恢復(fù)最大重試次數(shù)。超過(guò)最大重試次數(shù)后,目前僅打出錯(cuò)誤日志,下文會(huì)看到實(shí)現(xiàn)。
- #getRecoverDuration(),單個(gè)事務(wù)恢復(fù)重試的間隔時(shí)間,單位:秒。
- #getCronExpression(),定時(shí)任務(wù) cron 表達(dá)式。
- #getDelayCancelExceptions(),延遲取消異常集合。
org.mengyun.tcctransaction.spring.recover.DefaultRecoverConfig,默認(rèn)事務(wù)恢復(fù)配置實(shí)現(xiàn),實(shí)現(xiàn)代碼如下:
public class DefaultRecoverConfig implements RecoverConfig {
public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();
/**
* 最大重試次數(shù)
*/
private int maxRetryCount = 30;
/**
* 恢復(fù)間隔時(shí)間,單位:秒
*/
private int recoverDuration = 120;
/**
* cron 表達(dá)式
*/
private String cronExpression = "0 */1 * * * ?";
/**
* 延遲取消異常集合
*/
private Set> delayCancelExceptions = new HashSet>();
public DefaultRecoverConfig() {
delayCancelExceptions.add(OptimisticLockException.class);
delayCancelExceptions.add(SocketTimeoutException.class);
}
@Override
public void setDelayCancelExceptions(Set> delayCancelExceptions) {
this.delayCancelExceptions.addAll(delayCancelExceptions);
}
}
- maxRetryCount,單個(gè)事務(wù)恢復(fù)最大重試次數(shù) 為 30。
- recoverDuration,單個(gè)事務(wù)恢復(fù)重試的間隔時(shí)間為 120 秒。
- cronExpression,定時(shí)任務(wù) cron 表達(dá)式為 "0 */1 * * * ?",每分鐘執(zhí)行一次。如果你希望定時(shí)任務(wù)執(zhí)行的更頻繁,可以修改 cron 表達(dá)式,例如 0/30 * * * * ?,每 30 秒執(zhí)行一次。
- delayCancelExceptions,延遲取消異常集合。在 DefaultRecoverConfig 構(gòu)造方法里,預(yù)先添加了 OptimisticLockException / SocketTimeoutException 。
- 官方解釋:事務(wù)恢復(fù)的疑問(wèn)
- 這塊筆者還有一些疑問(wèn),如果有別的可能性導(dǎo)致這個(gè)情況,麻煩告知下筆者。謝謝。
- 官方解釋:為什么 tcc 事務(wù)切面中對(duì)樂(lè)觀鎖與socket超時(shí)異常不做回滾處理,只拋異常?
- 針對(duì) SocketTimeoutException :try 階段,本地參與者調(diào)用遠(yuǎn)程參與者( 遠(yuǎn)程服務(wù),例如 Dubbo,Http 服務(wù)),遠(yuǎn)程參與者 try 階段的方法邏輯執(zhí)行時(shí)間較長(zhǎng),超過(guò) Socket 等待時(shí)長(zhǎng),發(fā)生 SocketTimeoutException,如果立刻執(zhí)行事務(wù)回滾,遠(yuǎn)程參與者 try 的方法未執(zhí)行完成,可能導(dǎo)致 cancel 的方法實(shí)際未執(zhí)行( try 的方法未執(zhí)行完成,數(shù)據(jù)庫(kù)事務(wù)【非 TCC 事務(wù)】未提交,cancel 的方法讀取數(shù)據(jù)時(shí)發(fā)現(xiàn)未變更,導(dǎo)致方法實(shí)際未執(zhí)行,最終 try 的方法執(zhí)行完后,提交數(shù)據(jù)庫(kù)事務(wù)【非 TCC 事務(wù)】,較為極端 ),最終引起數(shù)據(jù)不一致。在事務(wù)恢復(fù)時(shí),會(huì)對(duì)這種情況的事務(wù)進(jìn)行取消回滾,如果此時(shí)遠(yuǎn)程參與者的 try 的方法還未結(jié)束,還是可能發(fā)生數(shù)據(jù)不一致。
- 針對(duì) OptimisticLockException :還是 SocketTimeoutException 的情況,事務(wù)恢復(fù)間隔時(shí)間小于 Socket 超時(shí)時(shí)間,此時(shí)事務(wù)恢復(fù)調(diào)用遠(yuǎn)程參與者取消回滾事務(wù),遠(yuǎn)程參與者下次更新事務(wù)時(shí),會(huì)因?yàn)闃?lè)觀鎖更新失敗,拋出 OptimisticLockException。如果 CompensableTransactionInterceptor 此時(shí)立刻取消回滾,可能會(huì)和定時(shí)任務(wù)的取消回滾沖突,因此統(tǒng)一交給定時(shí)任務(wù)處理。
3. 事務(wù)重試定時(shí)任務(wù)
org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事務(wù)恢復(fù)定時(shí)任務(wù),基于 Quartz 實(shí)現(xiàn)調(diào)度,不斷不斷不斷執(zhí)行事務(wù)恢復(fù)。實(shí)現(xiàn)代碼如下:
public class RecoverScheduledJob {
private TransactionRecovery transactionRecovery;
private TransactionConfigurator transactionConfigurator;
private Scheduler scheduler;
public void init() {
try {
// Quartz JobDetail
MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();
jobDetail.setTargetObject(transactionRecovery);
jobDetail.setTargetMethod("startRecover");
jobDetail.setName("transactionRecoveryJob");
jobDetail.setConcurrent(false); // 禁止并發(fā)
jobDetail.afterPropertiesSet();
// Quartz CronTriggerFactoryBean
CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();
cronTrigger.setBeanName("transactionRecoveryCronTrigger");
cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());
cronTrigger.setJobDetail(jobDetail.getObject());
cronTrigger.afterPropertiesSet();
// 啟動(dòng)任務(wù)調(diào)度
scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());
// 啟動(dòng) Quartz Scheduler
scheduler.start();
} catch (Exception e) {
throw new SystemException(e);
}
}
}
- 調(diào)用 MethodInvokingJobDetailFactoryBean#setConcurrent(false) 方法,禁用任務(wù)并發(fā)執(zhí)行。
- 調(diào)用 MethodInvokingJobDetailFactoryBean#setTargetObject(...) + MethodInvokingJobDetailFactoryBean#setTargetMethod(...) 方法,設(shè)置任務(wù)調(diào)用 TransactionRecovery#startRecover(...) 方法執(zhí)行。
如果應(yīng)用集群部署,會(huì)不會(huì)相同事務(wù)被多個(gè)定時(shí)任務(wù)同時(shí)重試?
答案是不會(huì),事務(wù)在重試時(shí)會(huì)樂(lè)觀鎖更新,同時(shí)只有一個(gè)應(yīng)用節(jié)點(diǎn)能更新成功。
官方解釋:多機(jī)部署下,所有機(jī)器都宕機(jī),從異常中恢復(fù)時(shí),所有的機(jī)器豈不是都可以查詢到所有的需要恢復(fù)的服務(wù)?
當(dāng)然極端情況下,Socket 調(diào)用超時(shí)時(shí)間大于事務(wù)重試間隔,第一個(gè)節(jié)點(diǎn)在重試某個(gè)事務(wù),一直未執(zhí)行完成,第二個(gè)節(jié)點(diǎn)已經(jīng)可以重試。
ps:建議,Socket 調(diào)用超時(shí)時(shí)間小于事務(wù)重試間隔。
是否定時(shí)任務(wù)和應(yīng)用服務(wù)器解耦?
螞蟻金服的分布式事務(wù)服務(wù) DTS 采用 client-server 模式:
- xts-client :負(fù)責(zé)事務(wù)的創(chuàng)建、提交、回滾、記錄。
- xts-server :負(fù)責(zé)異常事務(wù)的恢復(fù)。
4. 異常事務(wù)恢復(fù)
org.mengyun.tcctransaction.recover.TransactionRecovery,異常事務(wù)恢復(fù),實(shí)現(xiàn)主體代碼如下:
```Java
public class TransactionRecovery {
/**
* 啟動(dòng)恢復(fù)事務(wù)邏輯
*/
public void startRecover() {
// 加載異常事務(wù)集合
List transactions = loadErrorTransactions();
// 恢復(fù)異常事務(wù)集合
recoverErrorTransactions(transactions);
}
}
```
4.1 加載異常事務(wù)集合
調(diào)用 #loadErrorTransactions() 方法,加載異常事務(wù)集合。實(shí)現(xiàn)代碼如下:
private List loadErrorTransactions() {
TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();
return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));
}
- 異常事務(wù)的定義:當(dāng)前時(shí)間超過(guò) - 事務(wù)變更時(shí)間( 最后執(zhí)行時(shí)間 ) >= 事務(wù)恢復(fù)間隔( RecoverConfig#getRecoverDuration() )。這里有一點(diǎn)要注意,已完成的事務(wù)會(huì)從事務(wù)存儲(chǔ)器刪除。
4.2 恢復(fù)異常事務(wù)集合
調(diào)用 #recoverErrorTransactions(...) 方法,恢復(fù)異常事務(wù)集合。實(shí)現(xiàn)代碼如下:
private void recoverErrorTransactions(List transactions) {
for (Transaction transaction : transactions) {
// 超過(guò)最大重試次數(shù)
if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {
logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s
總結(jié)
以上是生活随笔為你收集整理的事务回滚什么意思 try_分布式事务 TCC-Transaction 源码分析——事务恢复的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: openlayer右键菜单_使用Open
- 下一篇: 玩cf出现outofmemory_《穿越