日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

發布時間:2023/12/4 asp.net 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

在ASP.NET Core Web API下事件驅動型架構的實現(一):一個簡單的實現中,我介紹了事件驅動型架構的一種簡單的實現,并演示了一個完整的事件派發、訂閱和處理的流程。這種實現太簡單了,百十行代碼就展示了一個基本工作原理。然而,要將這樣的解決方案運用到實際生產環境,還有很長的路要走。今天,我們就研究一下在事件處理器中,對象生命周期的管理問題。

事實上,不僅僅是在事件處理器中,我們需要關心對象的生命周期,在整個ASP.NET Core Web API的應用程序里,我們需要理解并仔細推敲被注冊到IoC容器中的服務,它們的生命周期應該是個怎樣的情形,這也是服務端應用程序設計必須認真考慮的內容。因為如果生命周期管理不合理,程序申請的資源無法合理釋放,最后便會帶來內存泄漏、程序崩潰等各種問題,然而這樣的問題對于服務端應用程序來說,是非常嚴重的。

記得在上一篇文章的結束部分,我給大家留下一個練習,就是讓大家在CustomerCreatedEventHandler事件處理器的HandleAsync方法中,填入自己的代碼,以便對獲得的事件消息做進一步的處理。作為本文的引子,我們首先將這部分工作做完,然后再進一步分析生命周期的問題。

Event Store

Event Store是CQRS體系結構模式中最為重要的一個組成部分,它的主要職責就是保存發生于領域模型中的領域事件,并對事件數據進行歸檔。當倉儲需要獲取領域模型對象時,Event Store也會配合快照數據庫一起,根據領域事件的發生順序,逐步回放并重塑領域模型對象。事實上,Event Store的實現是非常復雜的,雖然從它的職責上來看并不算太復雜,然而它所需要解決的事件同步、快照、性能、消息派發等問題,使得CQRS體系結構的實現變得非常復雜。在實際應用中,已經有一些比較成熟的框架和工具集,能夠幫助我們在CQRS中很方便地實現Event Store,比如GetEventStore就是一個很好的開源Event Store框架,它是基于.NET開發的,在微軟官方的eShopOnContainers說明文檔中,也提到了這個框架,推薦大家上他們的官網(https://eventstore.org/)了解一下。在這里我們就先不深入研究Event Store應該如何實現,我們先做一個簡單的Event Store,以便展示我們需要討論的問題。

延續著上一版的代碼庫(https://github.com/daxnet/edasample/tree/chapter_1),我們首先在EdaSample.Common.Events命名空間下,定義一個IEventStore的接口,這個接口非常簡單,僅僅包含一個保存事件的方法,代碼如下:


public interface IEventStore : IDisposable{????Task SaveEventAsync<TEvent>(TEvent @event)????????where TEvent : IEvent;}

SaveEventAsync方法僅有一個參數:由泛型類型TEvent綁定的@event對象。泛型約束表示SaveEventAsync方法僅能接受IEvent接口及其實現類型的對象作為參數傳入。接口定義好了,下一步就是實現這個接口,對傳入的事件對象進行保存。為了實現過程的簡單,我們使用Dapper,將事件數據保存到SQL Server數據庫中,來模擬Event Store對事件的保存操作。

Note:為什么IEventStore接口的SaveEventAsync方法簽名中,沒有CancellationToken參數?嚴格來說,支持async/await異步編程模型的方法定義上,是需要帶上CancellationToken參數的,以便調用方請求取消操作的時候,方法內部可以根據情況對操作進行取消。然而有些情況下取消操作并不是那么合理,或者方法內部所使用的API并沒有提供更深層的取消支持,因此也就沒有必要在方法定義上增加CancellationToken參數。在此處,為了保證接口的簡單,沒有引入CancellationToken的參數。

接下來,我們實現這個接口,并用Dapper將事件數據保存到SQL Server中。出于框架設計的考慮,我們新建一個Net Standard Class Library項目,在這個新的項目中實現IEventStore接口,這么做的原因已經在上文中介紹過了。代碼如下:


public class DapperEventStore : IEventStore{????private readonly string connectionString;????public DapperEventStore(string connectionString)????{????????this.connectionString = connectionString;????}????public async Task SaveEventAsync<TEvent>(TEvent @event) where TEvent : IEvent????{????????const string sql = @"INSERT INTO [dbo].[Events] ([EventId], [EventPayload], [EventTimestamp]) VALUES (@eventId, @eventPayload, @eventTimestamp)";????????using (var connection = new SqlConnection(this.connectionString))????????{????????????await connection.ExecuteAsync(sql, new????????????{????????????????eventId = @event.Id,????????????????eventPayload = JsonConvert.SerializeObject(@event),????????????????eventTimestamp = @event.Timestamp????????????});????????}????}????#region IDisposable Support????// 此處省略????#endregion}

IDisposable接口的實現部分暫且省略,可以看到,實現還是非常簡單的:通過構造函數傳入數據庫的連接字符串,在SaveEventAsyc方法中,基于SqlConnection對象執行Dapper的擴展方法來完成事件數據的保存。

Note: 此處使用了JsonConvert.SerializeObject方法來序列化事件對象,也就意味著DapperEventStore程序集需要依賴Newtonsoft.Json程序集。雖然在我們此處的案例中不會有什么影響,但這樣做會造成DapperEventStore對Newtonsoft.Json的強依賴,這樣的依賴關系不僅讓DapperEventStore變得不可測試,而且Newtonsoft.Json將來未知的變化,也會影響到DapperEventStore,帶來一些不確定性和維護性問題。更好的做法是,引入一個IMessageSerializer接口,在另一個新的程序集中使用Newtonsoft.Json來實現這個接口,同時僅讓DapperEventStore依賴IMessageSerializer,并在應用程序啟動時,將Newtonsoft.Json的實現注冊到IoC容器中。此時,IMessageSerializer可以被Mock,DapperEventStore就變得可測試了;另一方面,由于只有那個新的程序集會依賴Newtonsoft.Json,因此,Newtonsoft.Json的變化也僅僅會影響那個新的程序集,不會對框架主體的其它部分造成任何影響。

EventStore實現好了,接下來,我們將其用在CustomerCreatedEventHandler中,以便將訂閱的CustomerCreatedEvent保存下來。

事件數據的保存

保存事件數據的第一步,就是在ASP.NET Core Web API的IoC容器中,將DapperEventStore注冊進去。這一步是非常簡單的,只需要在Startup.cs的ConfigureServices方法中完成即可。代碼如下:


public void ConfigureServices(IServiceCollection services){????services.AddMvc();????services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();????services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"]));????services.AddSingleton<IEventBus, PassThroughEventBus>();}

注意我們使用的是services.AddTransient方法來注冊DapperEventStore,我們希望應用程序在每次請求IEventStore實例時,都能獲得一個新的DapperEventStore的實例。

接下來,打開CustomerCreatedEventHandler.cs文件,在構造函數中加入對IEventStore的依賴,然后修改HandleAsync方法,在該方法中使用IEventStore的實例來完成事件數據的保存。代碼如下:



public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>{????private readonly IEventStore eventStore;????public CustomerCreatedEventHandler(IEventStore eventStore)????{????????this.eventStore = eventStore;????}????public bool CanHandle(IEvent @event)????????=> @event.GetType().Equals(typeof(CustomerCreatedEvent));????public async Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)????{????????await this.eventStore.SaveEventAsync(@event);????????return true;????}????public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)????????=> CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);}

OK,代碼修改完畢,測試一下。

看看數據庫中客戶信息是否已經創建:

看看數據庫中事件數據是否已經保存成功:

OK,數據全部保存成功。

然而,事情真的就這么簡單么?No。在追蹤了IEventStore實例(也就是DapperEventStore)的生命周期后,你會發現,問題沒有想象的那么簡單。

追蹤對象的生命周期

在使用services.AddTransient/AddScoped/AddSingleton/AddScoped這些方法對服務進行注冊時,使用不同的方法也就意味著選擇了不同的對象生命周期。在此我們也不再深入討論每種方法之間的差異,微軟官方有詳細的文檔和demo(抱歉我沒有貼出中文鏈接,因為機器翻譯的緣故,實在有點不堪入目),如果對ASP.NET Core的IoC容器不熟悉的話,建議先了解一下官網文章的內容。在上面我稍微提了一下,我們是用AddTransient方法來注冊DapperEventStore的,因為我們希望在每次使用IEventStore的時候,都會有一個新的DapperEventStore被創建。現在,讓我們來驗證一下,看情況是否果真如此。

日志的使用

追蹤程序執行的最有效的方式就是使用日志。在我們的場景中,使用基于文件的日志會更合適,因為這樣我們可以更清楚地看到程序的執行過程以及對象的變化過程。同樣,我不打算詳細介紹如何在ASP.NET Core Web API中使用日志,微軟官網同樣有著非常詳盡的文檔來介紹這些內容。在這里,我簡要地將相關代碼列出來,以介紹如何啟用基于文件的日志系統。

首先,在Web API服務的項目上,添加對Serilog.Extensions.Logging.File的nuget包,使用它能夠非常方便地啟用基于文件的日志。然后,打開Program.cs文件,添加ConfigureLogging的調用:


public static IWebHost BuildWebHost(string[] args) =>????WebHost.CreateDefaultBuilder(args)????????.ConfigureLogging((context, lb) =>????????{????????????lb.AddFile(LogFileName);????????})????????.UseStartup<Startup>()????????.Build();

此處LogFileName為本地文件系統中的日志文件文件名,為了避免權限問題,我將日志寫入C:\Users\<user>\appdata\local目錄下,因為我的Web API進程是由當前登錄用戶啟動的,所以寫在這個目錄下不會有權限問題。如果今后我們把Web API host在IIS中,那么啟動IIS服務的用戶需要對日志所在的目錄具有寫入的權限,日志文件才能被正確寫入,這一點是需要注意的。

好了,現在可以使用日志了,先試試看。在Startup類的構造函數中,加入ILoggerFactory參數,并在構造函數執行時獲取ILogger實例,然后在ConfigureServices調用中輸出一些內容:


public class Startup{????private readonly ILogger logger;????public Startup(IConfiguration configuration, ILoggerFactory loggerFactory)????{????????Configuration = configuration;????????this.logger = loggerFactory.CreateLogger<Startup>();????}????public IConfiguration Configuration { get; }????public void ConfigureServices(IServiceCollection services)????{????????this.logger.LogInformation("正在對服務進行配置...");????????services.AddMvc();????????services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();????????services.AddTransient<IEventStore>(serviceProvider => ????????????new DapperEventStore(Configuration["mssql:connectionString"]));????????services.AddSingleton<IEventBus, PassThroughEventBus>();????????this.logger.LogInformation("服務配置完成,已注冊到IoC容器!");????}????// 其它方法暫時省略}

現在重新啟動服務,然后查看日志文件,發現日志可以被正確輸出:

接下來,使用類似的方式,向PassThroughEventBus的構造函數和Dispose方法中加入一些日志輸出,在CustomersController的Create方法中、CustomerCreatedEventHandler的構造函數和HandleAsync方法中、DapperEventStore的構造函數和Dispose方法中也加入一些日志輸出,以便能夠觀察當新的客戶信息被創建時,Web API的執行過程。限于文章篇幅,就不在此一一貼出各方法中加入日志輸出的代碼了,大家可以根據本文最后所提供的源代碼鏈接來獲取源代碼。簡單地舉個例子吧,比如對于DapperEventStore,我們通過構造函數注入ILogger的實例:


public class DapperEventStore : IEventStore{????private readonly string connectionString;????private readonly ILogger logger;????public DapperEventStore(string connectionString,????????ILogger<DapperEventStore> logger)????{????????this.connectionString = connectionString;????????this.logger = logger;????????logger.LogInformation($"DapperEventStore構造函數調用完成。Hash Code:{this.GetHashCode()}.");????}????// 其它函數省略}

這樣一來,在DapperEventStore的其它方法中,就可以通過logger來輸出日志了。

發現問題

同樣,再次運行Web API,并通過Powershell發起一次創建客戶信息的請求,然后打開日志文件,整個程序的執行過程基本上就一目了然了:

從上面的日志內容可以得知,當應用程序正常退出時,由IoC容器托管的PassThroughEventBus和DapperEventStore都能夠被正常Dispose,目前看來沒什么問題,因為資源可以正常釋放。現在讓我們重新啟動Web API,連續發送兩次創建客戶信息的請求,再次查看日志,我們得到了下面的內容:

從上面的日志內容可以看到,在Web API的整個運行期間,CustomerCreatedEventHandler僅被構造了一次,而且在每次處理CustomerCreatedEvent事件的時候,都是使用同一個DapperEventStore實例來保存事件數據。也就是說,CustomerCreatedEventHandler和DapperEventStore在整個Web API服務的生命周期中,有且僅有一個實例,它們是Singleton的!然而,在進行系統架構的時候,我們應該盡量保證較短的對象生命周期,以免因為狀態的不一致性導致不可回滾的錯誤出現,這也是架構設計中的一種最佳實踐。雖然目前我們的DapperEventStore在程序正常退出的時候能夠被Dispose掉,但如果DapperEventStore使用了非托管資源,并且非托管資源并沒有很好地管理自己的內存呢?久而久之,DapperEventStore就產生了內存泄漏點,慢慢地,Web API就會出現內存泄漏,系統資源將被耗盡。假如Web API被部署在云中,應用程序監控裝置(比如AWS的Cloud Watch)就會持續報警,并強制服務斷線,整個系統的可用性就無法得到保障。所以,我們更期望DapperEventStore能夠正確地實現C#的Dispose模式,在Dispose方法中合理地釋放資源,并且僅在需要使用DapperEventStore時候才去構建它,用完就及時Dispose,以保證資源的合理使用。這也就是為什么我們使用services.AddTransient方法來注冊CustomerCreatedEventHandler以及DapperEventStore的原因。

然而,事實卻并非如此。究其原因,就是因為PassThroughEventBus是單例實例,它的生命周期是整個Web API服務。而在PassThroughEventBus的構造函數中,CustomerCreatedEventHandler被作為參數傳入,于是,PassThroughEventBus產生了對CustomerCreatedEventHandler的依賴,而連帶地也產生了對DapperEventStore的依賴。換句話說,在整個應用程序運行的過程中,IoC框架完全沒有理由再去創建新的CustomerCreatedEventHandler以及DapperEventStore的實例,因為事件處理器作為強引用被注冊到PassThroughEventBus中,而PassThroughEventBus至始至終沒有變過!

Note:為什么PassThroughEventBus可以作為單例注冊到IoC容器中?因為它提供了無狀態的全局性的基礎結構層服務:事件總線。在PassThroughEventBus的實現中,這種全局性體現得不明顯,我們當然可以每一次HTTP請求都創建一個新的PassThroughEventBus來轉發事件消息并作處理。然而,在今后我們要實現的基于RabbitMQ的事件總線中,如果我們還是每次HTTP請求都創建一個新的消息隊列,不僅性能得不到保證,而且消息并不能路由到新創建的channel上。注意:我們將其注冊成單例,一個很重要的依據是由于它是無狀態的,但即使如此,我們也要注意在應用程序退出的時候,合理Dispose掉它所占用的資源。當然,在這里,ASP.NET Core的IoC機制會幫我們解決這個問題(因為我注冊了PassThroughEventBus,但我沒有顯式調用Dispose方法,我仍然能從日志中看到“PassThroughEventBus已經被Dispose”的字樣),然而有些情況下,ASP.NET Core不會幫我們做這些,就需要我們自己手工完成。

OMG!由于構造函數注入,使得對象之間產生了依賴關系,從而影響到了它們的生命周期,這可怎么辦?既然問題是由依賴引起的,那么就需要想辦法解耦。

解耦!解決事件處理器對象生命周期問題

經過分析,我們需要解除PassThroughEventBus對各種EventHandler的直接依賴。因為PassThroughEventBus是單例的,那么由它引用的所有組件也只可能具有相同的生命周期。然而,這樣的解耦又該如何做呢?將EventHandler封裝到另一個類中?結果還是一樣,PassThroughEventBus總會通過某種對象關系,來間接引用到EventHandler上,造成EventHandler全局唯一。

或許,應該要有另一套生命周期管理體系來管理EventHandler的生命周期,使得每當PassThroughEventBus需要使用EventHandler對所訂閱的事件進行處理的時候,都會通過這套體系來請求新的EventHandler實例,這樣一來,PassThroughEventBus也就不再依賴于某個特定的實例了,而僅僅是引用了各種EventHandler在新的生命周期管理體系中的注冊信息。每當需要的時候,PassThroughEventBus都會將事件處理器的注冊信息傳給新的管理體系,然后由這套新的體系來維護事件處理器的生命周期。

通過閱讀微軟官方的eShopOnContainers案例代碼后,證實了這一想法。在案例中,有如下代碼:



// namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQprivate async Task ProcessEvent(string eventName, string message){????if (_subsManager.HasSubscriptionsForEvent(eventName))????{????????using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))????????{????????????var subscriptions = _subsManager.GetHandlersForEvent(eventName);????????????foreach (var subscription in subscriptions)????????????{????????????????if (subscription.IsDynamic)????????????????{ ????????????????????var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;????????????????????dynamic eventData = JObject.Parse(message);????????????????????await handler.Handle(eventData);????????????????}????????????????else????????????????{????????????????????var eventType = _subsManager.GetEventTypeByName(eventName);????????????????????var integrationEvent = JsonConvert.DeserializeObject(message, eventType);????????????????????var handler = scope.ResolveOptional(subscription.HandlerType);????????????????????var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);????????????????????await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });????????????????}????????????}????????}????}}

可以看到,高亮的這一行,通過Autofac創建了一個新的LifetimeScope,在這個Scope中,通過eventName來獲得一個subscription對象(也就是EventHandler的注冊信息),進而通過scope的ResolveOptional調用來獲得新的EventHandler實例。基本過程就是這樣,目前也不需要糾結IDynamicIntegrationEventHandler是干什么用的,也不需要糾結為什么要使用dynamic來保存事件數據。重點是,autofac的BeginLifetimeScope方法調用創建了一個新的IoC Scope,在這個Scope中解析(resolve)了新的EventHandler實例。在eShopOnContainer案例中,EventBusRabbitMQ的設計是特定的,必須依賴于Autofac作為依賴注入框架。或許這部分設計可以進一步改善,使得EventBusRabbitMQ不會強依賴于Autofac。

接下來,我們會引入一個新的概念:事件處理器執行上下文,使用類似的方式來解決對象生命周期問題。

事件處理器執行上下文

事件處理器執行上下文(Event Handler Execution Context, EHEC)為事件處理器提供了一個完整的生命周期管理機制,在這套機制中,事件處理器及其引用的對象資源可以被正常創建和正常銷毀。現在讓我們一起看看,如何在EdaSample的案例代碼中使用事件處理器執行上下文。

事件處理器執行上下文的接口定義如下,當然,這部分接口是放在EdaSample.Common.Events目錄下,作為消息系統的框架代碼提供給調用方:


public interface IEventHandlerExecutionContext{????void RegisterHandler<TEvent, THandler>()????????where TEvent : IEvent????????where THandler : IEventHandler<TEvent>;????void RegisterHandler(Type eventType, Type handlerType);????bool HandlerRegistered<TEvent, THandler>()????????where TEvent : IEvent????????where THandler : IEventHandler<TEvent>;????bool HandlerRegistered(Type eventType, Type handlerType);????Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default);}

這個接口主要包含三種方法:注冊事件處理器、判斷事件處理器是否已經注冊,以及對接收到的事件消息進行處理。整個結構還是非常清晰簡單的。現在需要實現這個接口。根據上面的分析,這個接口的實現是需要依賴于IoC容器的,目前簡單起見,我們僅使用微軟ASP.NET Core標準的Dependency Injection框架來實現,當然,也可以使用Autofac,取決于你怎樣去實現上面這個接口。需要注意的是,由于該接口的實現是需要依賴于第三方組件的(在這里是微軟的Dependency Injection框架),因此,最佳做法是新建一個類庫,并引用EdaSample.Common程序集,并在這個新的類庫中,依賴Dependency Injection框架來實現這個接口。

以下是基于Microsoft.Extensions.DependencyInjection框架來實現的事件處理器執行上下文完整代碼,這里有個兼容性問題,就是構造函數的第二個參數:serviceProviderFactory。在Microsoft.Extensions.DependencyInjection框架2.0版本之前,IServiceCollection.BuildServiceProvider方法的返回類型是IServiceProvider,但從2.0開始,它的返回類型已經從IServiceProvider接口,變成了ServiceProvider類。這里引出了框架設計的另一個原則,就是依賴較低版本的.NET Core,以便獲得更好的兼容性。如果我們的EdaSample是使用.NET Core 1.1開發的,那么當下面這個類被直接用在ASP.NET Core 2.0的項目中時,如果不通過構造函數參數傳入ServiceProvider創建委托,而是直接在代碼中使用registry.BuildServiceProvider調用,就會出現異常。



public class EventHandlerExecutionContext : IEventHandlerExecutionContext{????private readonly IServiceCollection registry;????private readonly Func<IServiceCollection, IServiceProvider> serviceProviderFactory;????private readonly ConcurrentDictionary<Type, List<Type>> registrations = new ConcurrentDictionary<Type, List<Type>>();????public EventHandlerExecutionContext(IServiceCollection registry, ????????Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null)????{????????this.registry = registry;????????this.serviceProviderFactory = serviceProviderFactory ?? (sc => registry.BuildServiceProvider());????}????public async Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken))????{????????var eventType = @event.GetType();????????if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypes) &&????????????handlerTypes?.Count > 0)????????{????????????var serviceProvider = this.serviceProviderFactory(this.registry);????????????using (var childScope = serviceProvider.CreateScope())????????????{????????????????foreach(var handlerType in handlerTypes)????????????????{????????????????????var handler = (IEventHandler)childScope.ServiceProvider.GetService(handlerType);????????????????????if (handler.CanHandle(@event))????????????????????{????????????????????????await handler.HandleAsync(@event, cancellationToken);????????????????????}????????????????}????????????}????????}????}????public bool HandlerRegistered<TEvent, THandler>()????????where TEvent : IEvent????????where THandler : IEventHandler<TEvent>????????=> this.HandlerRegistered(typeof(TEvent), typeof(THandler));????public bool HandlerRegistered(Type eventType, Type handlerType)????{????????if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypeList))????????{????????????return handlerTypeList != null && handlerTypeList.Contains(handlerType);????????}????????return false;????}????public void RegisterHandler<TEvent, THandler>()????????where TEvent : IEvent????????where THandler : IEventHandler<TEvent>????????=> this.RegisterHandler(typeof(TEvent), typeof(THandler));????public void RegisterHandler(Type eventType, Type handlerType)????{????????Utils.ConcurrentDictionarySafeRegister(eventType, handlerType, this.registrations);????????this.registry.AddTransient(handlerType);????}}

好了,事件處理器執行上下文就定義好了,接下來就是在我們的ASP.NET Core Web API中使用。為了使用IEventHandlerExecutionContext,我們需要修改事件訂閱器的接口定義,并相應地修改PassThroughEventBus以及Startup.cs。代碼如下:



// IEventSubscriberpublic interface IEventSubscriber : IDisposable{????void Subscribe<TEvent, TEventHandler>()????????where TEvent : IEvent????????where TEventHandler : IEventHandler<TEvent>;}// PassThroughEventBuspublic sealed class PassThroughEventBus : IEventBus{????private readonly EventQueue eventQueue = new EventQueue();????private readonly ILogger logger;????private readonly IEventHandlerExecutionContext context;????public PassThroughEventBus(IEventHandlerExecutionContext context,????????ILogger<PassThroughEventBus> logger)????{????????this.context = context;????????this.logger = logger;????????logger.LogInformation($"PassThroughEventBus構造函數調用完成。Hash Code:{this.GetHashCode()}.");????????eventQueue.EventPushed += EventQueue_EventPushed;????}????private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)????????=> await this.context.HandleEventAsync(e.Event);????public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)????????where TEvent : IEvent????????????=> Task.Factory.StartNew(() => eventQueue.Push(@event));????public void Subscribe<TEvent, TEventHandler>()????????where TEvent : IEvent????????where TEventHandler : IEventHandler<TEvent>????{????????if (!this.context.HandlerRegistered<TEvent, TEventHandler>())????????{????????????this.context.RegisterHandler<TEvent, TEventHandler>();????????}????}????#region IDisposable Support????private bool disposedValue = false; // To detect redundant calls????void Dispose(bool disposing)????{????????if (!disposedValue)????????{????????????if (disposing)????????????{????????????????this.eventQueue.EventPushed -= EventQueue_EventPushed;????????????????logger.LogInformation($"PassThroughEventBus已經被Dispose。Hash Code:{this.GetHashCode()}.");????????????}????????????disposedValue = true;????????}????}????public void Dispose() => Dispose(true);????#endregion}// Startup.cspublic void ConfigureServices(IServiceCollection services){????this.logger.LogInformation("正在對服務進行配置...");????services.AddMvc();????services.AddTransient<IEventStore>(serviceProvider => ????????new DapperEventStore(Configuration["mssql:connectionString"], ????????????serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));????var eventHandlerExecutionContext = new EventHandlerExecutionContext(services, ????????sc => sc.BuildServiceProvider());????services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);????services.AddSingleton<IEventBus, PassThroughEventBus>();????this.logger.LogInformation("服務配置完成,已注冊到IoC容器!");}// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.public void Configure(IApplicationBuilder app, IHostingEnvironment env){????var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();????eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>();????if (env.IsDevelopment())????{????????app.UseDeveloperExceptionPage();????}????app.UseMvc();}

代碼修改完成后,再次執行Web API,并發送兩次(或多次)創建客戶的請求,然后查看日志,我們發現,每次請求都會使用新的事件處理器去處理接收到的消息,在保存消息數據時,會使用新的DapperEventStore來保存數據,而保存完成后,會及時將DapperEventStore dispose掉:

小結

本文篇幅比較長,或許你沒有太多耐心將文章讀完。但我盡量將問題分析清楚,希望提供給讀者的內容是詳細的、有理有據的。文章中黑體部分是在設計過程中的一些思考和需要注意的地方,希望能夠給讀者在工作和學習之中帶來啟發和收獲。總而言之,對象生命周期的管理,在服務端應用程序中是非常重要的,需要引起足夠的重視。在下文中,我們打算逐步擺脫PassThroughEventBus,基于RabbitMQ來實現消息總線的基礎結構。

源代碼的使用

本系列文章的源代碼在https://github.com/daxnet/edasample這個Github Repo里,通過不同的release tag來區分針對不同章節的源代碼。本文的源代碼請參考chapter_2這個tag,如下:

相關文章:?

  • ASP.NET Core Web API下事件驅動型架構的實現(一):一個簡單的實現

原文地址:https://www.cnblogs.com/daxnet/p/8270480.html


.NET社區新聞,深度好文,歡迎訪問公眾號文章匯總 http://www.csharpkit.com

總結

以上是生活随笔為你收集整理的ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。