FreeSql接入CAP的实践
CAP
CAP 是一個(gè)基于 .NET Standard 的 C# 庫(kù),它是一種處理分布式事務(wù)的解決方案,同樣具有 EventBus 的功能,它具有輕量級(jí)、易使用、高性能等特點(diǎn)。
https://github.com/dotnetcore/CAP
ADO.NET事務(wù)
1.DotNetCore.CAP.MySql中引用 了如下類庫(kù).在Commit事務(wù)時(shí),會(huì)調(diào)用 Flush方法推送消息
<PackageReference?Include="Microsoft.EntityFrameworkCore"?Version="3.1.7"?/> <PackageReference?Include="Microsoft.EntityFrameworkCore.Relational"?Version="3.1.7"?/> <PackageReference?Include="MySqlConnector"?Version="1.0.1"?/>https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
其中我們能看到,事務(wù)的提交,會(huì)調(diào)用父類CapTransactionBase中的方法Flush。他是protected類型的,并未開放出此接口。
???????protected?virtual?void?Flush(){while?(!_bufferList.IsEmpty){_bufferList.TryDequeue(out?var?message);_dispatcher.EnqueueToPublish(message);}}我們來看一下集成 的demo調(diào)用
????[Route("~/adonet/transaction")]public?IActionResult?AdonetWithTransaction(){using?(var?connection?=?new?MySqlConnection(AppDbContext.ConnectionString)){using?(var?transaction?=?connection.BeginTransaction(_capBus,?true)){//your?business?codeconnection.Execute("insert?into?test(name)?values('test')",?transaction:?(IDbTransaction)transaction.DbTransaction);//for?(int?i?=?0;?i?<?5;?i++)//{_capBus.Publish("sample.rabbitmq.mysql",?DateTime.Now);//}}}return?Ok();}https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
代碼中通過擴(kuò)展IDbConnection類,增加BeginTransaction方法,傳遞了注入的_capBus類,傳了autoCommit
private?readonly?ICapPublisher?_capBus;public?PublishController(ICapPublisher?capPublisher) {_capBus?=?capPublisher; } ///?<summary> ///?Start?the?CAP?transaction ///?</summary> ///?<param?name="dbConnection">The?<see?cref="IDbConnection"?/>.</param> ///?<param?name="publisher">The?<see?cref="ICapPublisher"?/>.</param> ///?<param?name="autoCommit">Whether?the?transaction?is?automatically?committed?when?the?message?is?published</param> ///?<returns>The?<see?cref="ICapTransaction"?/>?object.</returns> public?static?ICapTransaction?BeginTransaction(this?IDbConnection?dbConnection,ICapPublisher?publisher,?bool?autoCommit?=?false) {if?(dbConnection.State?==?ConnectionState.Closed){dbConnection.Open();}var?dbTransaction?=?dbConnection.BeginTransaction();publisher.Transaction.Value?=?publisher.ServiceProvider.GetService<ICapTransaction>();return?publisher.Transaction.Value.Begin(dbTransaction,?autoCommit); }autoCommit:false,(此屬性會(huì)自動(dòng)提交事務(wù),集成其他ORM,不建議開啟)因?yàn)?#xff0c;我們只要調(diào)用 了Publish,他會(huì)調(diào)用MySqlCapTransaction中的Commit(),并執(zhí)行Flush,即消息 會(huì)發(fā)出去。IDbContextTransaction
這段代碼是非常 重要的。
????publisher.Transaction.Value?=?publisher.ServiceProvider.GetService<ICapTransaction>();從CapPublisher中可以看出,事務(wù)是通過AsyncLocal實(shí)現(xiàn)狀態(tài)共享的。
internal?class?CapPublisher?:?ICapPublisher {public?AsyncLocal<ICapTransaction>?Transaction?{?get;?} }publisher.Transaction.Value的類型實(shí)現(xiàn)上才是ICapTransaction ,
CapTransactionExtensions.cs還有一個(gè)擴(kuò)展方法,調(diào)用Begin,相當(dāng)于給當(dāng)前控制器上注入的ICapPublisher設(shè)置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。
??????public?static?ICapTransaction?Begin(this?ICapTransaction?transaction,IDbTransaction?dbTransaction,?bool?autoCommit?=?false){transaction.DbTransaction?=?dbTransaction;transaction.AutoCommit?=?autoCommit;return?transaction;}對(duì)于ADO.NET,我們只要傳遞transaction,就能保證發(fā)送消息和操作DB是一個(gè)事務(wù)了。。
EF Core事務(wù)
同樣,我們看擴(kuò)展方法和使用方式
????public?static?IDbContextTransaction?BeginTransaction(this?DatabaseFacade?database,ICapPublisher?publisher,?bool?autoCommit?=?false){var?trans?=?database.BeginTransaction();publisher.Transaction.Value?=?publisher.ServiceProvider.GetService<ICapTransaction>();var?capTrans?=?publisher.Transaction.Value.Begin(trans,?autoCommit);return?new?CapEFDbTransaction(capTrans);}dbContext.Database就是DatabaseFacade類型。直接能BeginTransaction事務(wù)。
[Route("~/ef/transaction")] public?IActionResult?EntityFrameworkWithTransaction([FromServices]AppDbContext?dbContext) {using?(var?trans?=?dbContext.Database.BeginTransaction(_capBus,?autoCommit:?false)){dbContext.Persons.Add(new?Person()?{?Name?=?"ef.transaction"?});for?(int?i?=?0;?i?<?1;?i++){_capBus.Publish("sample.rabbitmq.mysql",?DateTime.Now);}dbContext.SaveChanges();trans.Commit();}return?Ok(); }同樣,還有一個(gè)Begin擴(kuò)展方法,僅僅是給ICapTransaction賦下值。
public?static?ICapTransaction?Begin(this?ICapTransaction?transaction,IDbContextTransaction?dbTransaction,?bool?autoCommit?=?false) {transaction.DbTransaction?=?dbTransaction;transaction.AutoCommit?=?autoCommit;return?transaction; }在這個(gè)demo,上,autoCommit是false,因?yàn)閐bContext有自己的SaveChanges(),如果發(fā)送不太合適。SaveChanges()要做好些操作,具體不太情況是什么。具體不詳細(xì)研究。
但我們可以看下CapTransactionBase源碼,DbTransaction是Object類型。
EF Core中的事務(wù)類型是IDbContextTransaction
ADO.NET實(shí)際是IDbTransaction類型。
?public?object?DbTransaction?{?get;?set;?}所以在最開始的那段代碼,判斷DbTransaction,是哪種類型,然后調(diào)用自身內(nèi)部使用的事務(wù)進(jìn)行Commit()。如果要集成其他ORM,但又想去掉EFCore的依賴,然后增加其他ORM,如下類似的處理,就是關(guān)鍵,比如CommitAsync,Commit,Roolback()
????public?override?void?Commit(){Debug.Assert(DbTransaction?!=?null);switch?(DbTransaction){case?IDbTransaction?dbTransaction:dbTransaction.Commit();break;case?IDbContextTransaction?dbContextTransaction:dbContextTransaction.Commit();break;}Flush();}還有MySqlDataStorage.cs
https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
判斷dbTransaction的類型,然后獲取當(dāng)前事務(wù),引用其他ORM,記得修改此處。
????var?dbTrans?=?dbTransaction?as?IDbTransaction;if?(dbTrans?==?null?&&?dbTransaction?is?IDbContextTransaction?dbContextTrans){dbTrans?=?dbContextTrans.GetDbTransaction();}參考項(xiàng)目(不打算維護(hù))
https://github.com/luoyunchong/DotNetCore.CAP.Provider
維護(hù)就要保證與上層Dotnetcore/cap項(xiàng)目保持同步,這是一件困難的事。
還有一個(gè)重要的原因是:我們有更簡(jiǎn)單的方式。
FreeSql接入CAP(最簡(jiǎn)單的方式)
關(guān)于此問題的想法
https://github.com/luoyunchong/DotNetCore.CAP.Provider/issues/1
我們還是引用各自官方的庫(kù)
Install-Package?DotNetCore.CAP.Dashboard Install-Package?DotNetCore.CAP.MySql Install-Package?DotNetCore.CAP.RabbitMQ Install-Package?FreeSql Install-Package?FreeSql.DbContext Install-Package?FreeSql.Provider.MySqlConnector關(guān)于CAP集成的方式,配置項(xiàng),這里不做詳情,官方地址有中文:http://cap.dotnetcore.xyz/
代碼參考。因?yàn)橹挥幸粋€(gè)類,我們自行復(fù)制項(xiàng)目即可。
https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Application/CapUnitOfWorkExtensions.cs
重寫擴(kuò)展方法,BeginTransaction。是基于IUnitOfWork的擴(kuò)展。
提交事務(wù)調(diào)用Commit(IUnitOfWork)時(shí),內(nèi)部再通過反射調(diào)用 ICapTransaction中protected類型的方法Flush。
public static class CapUnitOfWorkExtensions{public static void Flush(this ICapTransaction capTransaction){capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null);}public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false){publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction));return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit);}public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork){unitOfWork.Commit();capTransaction.Flush();}}注入我們的FreeSql
public?void?ConfigureServices(IServiceCollection?services){IConfigurationSection?configurationSection?=?Configuration.GetSection($"ConnectionStrings:MySql");IFreeSql?fsql?=?new?FreeSqlBuilder().UseConnectionString(DataType.MySql,?configurationSection.Value);.UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower).UseAutoSyncStructure(true).UseNoneCommandParameter(true).UseMonitorCommand(cmd?=>{Trace.WriteLine(cmd.CommandText?+?";");}).Build();services.AddSingleton(fsql);services.AddFreeRepository();services.AddScoped<UnitOfWorkManager>(); }示例
????[HttpGet("~/freesql/unitofwork/{id}")]public?DateTime?UnitOfWorkManagerTransaction(int?id,?[FromServices]?IBaseRepository<Book>?repo){DateTime?now?=?DateTime.Now;using?(IUnitOfWork?uow?=?_unitOfWorkManager.Begin()){ICapTransaction?trans?=?_unitOfWorkManager.Current.BeginTransaction(_capBus,?false);repo.Insert(new?Book(){Author?=?"luoyunchong",Summary?=?"2",Title?=?"122"});_capBus.Publish("freesql.time",?now);trans.Commit(uow);}return?now;}[NonAction][CapSubscribe("freesql.time")]public?void?GetTime(DateTime?time){Console.WriteLine($"time:{time}");}注意trans不需要using,freesql內(nèi)部會(huì)釋放資源。,也可using,但請(qǐng)更新到最新的freesql版本。
ICapTransaction?trans?=?_unitOfWorkManager.Current.BeginTransaction(_capBus,?false);提交事務(wù),也請(qǐng)調(diào)用擴(kuò)展方法,否則事務(wù)無(wú)法正常。
trans.Commit(uow);源碼位置
https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Web/Controllers/v1/TestController.cs
總結(jié)
以上是生活随笔為你收集整理的FreeSql接入CAP的实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用 .NET 5 体验大数据和机器学习
- 下一篇: 简单理解线程同步上下文