日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

TPL Dataflow .Net 数据流组件,了解一下?

發布時間:2025/3/15 asp.net 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 TPL Dataflow .Net 数据流组件,了解一下? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

回顧上文

  作為單體程序,依賴的第三方服務雖不多,但是2C的程序還是有不少內容可講; 作為一個常規互聯網系統,無外乎就是接受請求、處理請求,輸出響應。

由于業務漸漸增長,數據處理的過程會越來越復雜和冗長,【連貫高效的處理數據】 越來越被看重,? .Net 提供了TPL? Dataflow組件使我們更高效的實現基于數據流和 流水線操作的代碼

? ? 下圖是單體程序中 數據處理的用例圖。

?

?程序中用到的TPL Dataflow 組件,Dataflow是微軟前幾年給出的數據處理庫,?是由不同的處理塊組成,可將這些塊組裝成一個處理管道,"塊"對應處理管道中的"階段", 可類比AspNetCore 中Middleware 和pipeline.。

  • TPL Dataflow庫為消息傳遞和并行化CPU密集型和I / O密集型應用程序提供了編程基礎,這些應用程序具有高吞吐量和低延遲。它還可以讓您明確控制數據的緩沖方式并在系統中移動。

  • 為了更好地理解數據流編程模型,請考慮從磁盤異步加載圖像并創建這些圖像的應用程序。
    • ? 傳統的編程模型通常使用回調和同步對象(如鎖)來協調任務和訪問共享數據, 從宏觀看傳統模型: 任務是一步步緊接著完成的

    • ? 通過使用數據流編程模型,您可以創建在從磁盤讀取圖像時處理圖像的數據流對象。在數據流模型下,您可以聲明數據在可用時的處理方式以及數據之間的依賴關系。由于運行時管理數據之間的依賴關系,因此通常可以避免同步訪問共享數據的要求。此外,由于運行時調度基于數據的異步到達而工作,因此數據流可以通過有效地管理底層線程來提高響應性和吞吐量。  ? 也就是說: 你定義的是任務內容和任務之間的依賴,不關注數據什么時候流到這個任務?。

  • ? ?需要注意的是:TPL Dataflow 非分布式數據流,消息在進程內傳遞,? ?使用nuget引用?System.Threading.Tasks.Dataflow 包。

TPL Dataflow 核心概念

?1.? Buffer & Block

TPL Dataflow 內置的Block覆蓋了常見的應用場景,當然如果內置塊不能滿足你的要求,你也可以自定“塊”。

Block可以劃分為下面3類:

  • Buffering Only? ? 【Buffer不是緩存Cache的概念, 而是一個緩沖區的概念】

  • Execution

  • Grouping?

使用以上塊混搭處理管道, 大多數的塊都會執行一個操作,有些時候需要將消息分發到不同Block,這時可使用特殊類型的緩沖塊給管道“”分叉”。

2. Execution Block

可執行的塊有兩個核心組件:
  • 輸入、輸出消息的緩沖區(一般稱為Input,Output隊列)

  • 在消息上執行動作的委托

  消息在輸入和輸出時能夠被緩沖:當Func委托的運行速度比輸入的消息速度慢時,后續消息將在到達時進行緩沖;當下一個塊的輸入緩沖區中沒有容量時,將在輸出時緩沖。

每個塊我們可以配置:

  • 緩沖區的總容量, 默認無上限

  • 執行操作委托的并發度, 默認情況下塊按照順序處理消息,一次一個。

我們將塊鏈接在一起形成一個處理管道,生產者將消息推向管道。

TPL Dataflow有一個基于pull的機制(使用Receive和TryReceive方法),但我們將在管道中使用塊連接和推送機制。

  • TransformBlock(Execution category)-- 由輸入輸出緩沖區和一個Func<TInput, TOutput>委托組成,消費的每個消息,都會輸出另外一個,你可以使用這個Block去執行輸入消息的轉換,或者轉發輸出的消息到另外一個Block。

  • TransformManyBlock (Execution category) -- 由輸入輸出緩沖區和一個Func<TInput, IEnumerable<TOutput>>委托組成, 它為輸入的每個消息輸出一個 IEnumerable<TOutput>

  • BroadcastBlock (Buffering category)-- 由只容納1個消息的緩沖區和Func<T, T>委托組成。緩沖區被每個新傳入的消息所覆蓋,委托僅僅為了讓你控制怎樣克隆這個消息,不做消息轉換。

            該塊可以鏈接到多個塊(管道的分叉),雖然它一次只緩沖一條消息,但它一定會在該消息被覆蓋之前將該消息轉發到鏈接塊(鏈接塊還有緩沖區)。

  • ActionBlock (Execution category)-- 由緩沖區和Action<T>委托組成,他們一般是管道的結尾,他們不再給其他塊轉發消息,他們只會處理輸入的消息。

  • BatchBlock (Grouping category)-- 告訴它你想要的每個批處理的大小,它將累積消息,直到它達到那個大小,然后將它作為一組消息轉發到下一個塊。

  還有一下其他的Block類型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我們暫時不會深入。

3. Pipeline Chain React

  當輸入緩沖區達到上限容量,為其供貨的上游塊的輸出緩沖區將開始填充,當輸出緩沖區已滿時,該塊必須暫停處理,直到緩沖區有空間,這意味著一個Block的處理瓶頸可能導致所有前面的塊的緩沖區被填滿。

  但是不是所有的塊變滿時,都會暫停,BroadcastBlock 有允許1個消息的緩沖區,每個消息都會被覆蓋, 因此如果這個廣播塊不能將消息轉發到下游,則在下個消息到達的時候消息將丟失,這在某種意義上是一種限流(比較生硬).

編程實踐

?   將按照上圖實現TPL Dataflow?

①? 定義Dataflow? pipeline public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory){_httpClient = httpClientFactory.CreateClient("bce-request");_redisDB0 = redisCache[0];_redisDB = redisCache;_logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));var option = new DataflowLinkOptions { PropagateCompletion = true };publisher = _redisDB.RedisConnection.GetSubscriber();_eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>(// redis piublih 沒有做在TransformBlock fun里面, 因為publih失敗可能影響后續的block傳遞eqidPair => EqidResolverAsync(eqidPair),new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")});// https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline_logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);_logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );_broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容納一個消息的緩存區和拷貝函數組成 _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);_broadcastBlock.LinkTo(_logPublishBlock, option);_eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);} public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase{private readonly string _dirPath;private readonly Timer _triggerBatchTimer;private readonly Timer _openFileTimer;private DateTime? _nextCheckpoint;private TextWriter _currentWriter;private readonly LogHead _logHead;private readonly object _syncRoot = new object();private readonly ILogger _logger;private readonly BatchBlock<T> _packer;private readonly ActionBlock<T[]> batchWriterBlock;private readonly TimeSpan _logFileIntervalTimeSpan;/// <summary>/// Generate request log file./// </summary>public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory){_logger = loggerFactory.CreateLogger<LogBatchBlock<T>>();_dirPath = logConfig.DirPath;if (!Directory.Exists(_dirPath)){Directory.CreateDirectory(_dirPath);}_logHead = logConfig.LogHead;_packer = new BatchBlock<T>(logConfig.BatchSize);batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models)); _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });_triggerBatchTimer = new Timer(state =>{_packer.TriggerBatch();}, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));_logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);_openFileTimer = new Timer(state =>{AlignCurrentFileTo(DateTime.Now);}, null, TimeSpan.Zero, _logFileIntervalTimeSpan);}public ITargetBlock<T> InputBlock => _packer;private void AlignCurrentFileTo(DateTime dt){if (!_nextCheckpoint.HasValue){OpenFile(dt);}if (dt >= _nextCheckpoint.Value){CloseFile();OpenFile(dt);}}private void OpenFile(DateTime now, string fileSuffix = null){string filePath = null;try{var currentHour = now.Date.AddHours(now.Hour);_nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan);int hourConfiguration = _logFileIntervalTimeSpan.Hours;int minuteConfiguration = _logFileIntervalTimeSpan.Minutes;filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log";var appendHead = !File.Exists(filePath);if (filePath != null){var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write);var sw = new StreamWriter(stream, Encoding.Default);if (appendHead){sw.Write(GenerateHead());}_currentWriter = sw;_logger.LogDebug($"{currentHour} TextWriter has been created.");}}catch (UnauthorizedAccessException ex){_logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex);throw;}catch (Exception e){if (fileSuffix == null){_logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace);OpenFile(now, $"-{Guid.NewGuid()}");}else{_logger.LogError($"OpenFile failed after retry: {filePath}", e);throw;}}}private void CloseFile(){if (_currentWriter != null){_currentWriter.Flush();_currentWriter.Dispose();_currentWriter = null;_logger.LogDebug($"{DateTime.Now} TextWriter has been disposed.");}_nextCheckpoint = null;}private string GenerateHead(){StringBuilder head = new StringBuilder();head.AppendLine("#Software: " + _logHead.Software).AppendLine("#Version: " + _logHead.Version).AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}").AppendLine("#Fields: " + _logHead.Fields);return head.ToString();}private void WriteToFile(T[] models){try{lock (_syncRoot){var flag = false;foreach (var model in models){if (model == null)continue;flag = true;AlignCurrentFileTo(model.ServerLocalTime);_currentWriter.WriteLine(model.ToString());}if (flag)_currentWriter.Flush();}}catch (Exception ex){_logger.LogError("WriteToFile Error : {0}", ex.Message);}}public bool AcceptLogModel(T model){return _packer.Post(model);}public string GetDirPath(){return _dirPath;}public async Task CompleteAsync(){_triggerBatchTimer.Dispose();_openFileTimer.Dispose();_packer.TriggerBatch();_packer.Complete();await InputBlock.Completion;lock (_syncRoot){CloseFile();}}} 仿IIS日志存儲代碼

② 異常處理

  上述程序在部署時就遇到相關的坑位,在測試環境_eqid2ModelTransformBlock?內Func委托穩定執行,程序并未出現異樣;

  部署到生產之后, 該Pipeline會運行一段時間就停止工作,一直很困惑, 后來通過監測_eqid2ModelTransformBlock.Completion 屬性,該塊提前進入“完成態”? ?:???程序在執行某次Func委托時報錯,Block提前進入完成態

TransfomrBlock.Completion 一個Task對象,當TPL Dataflow不再處理消息并且能保證不再處理消息的時候,就被定義為完成態, Task對象的TaskStatus枚舉值將標記此Block進入完成態的真實原因

- TaskStatus.RanToCompletion? ? ? ?根據Block定義的任務成功完成

- TaskStatus.Fault? ? ? ? ? ? ? ? ? ? ? ? ? ? 因為未處理的異常?導致"過早的完成"

- TaskStatus.Cancled? ? ? ? ? ? ? ? ? ? ?? 因為取消操作?導致 "過早的完成"

  我們需要小心處理異常, 一般情況下我們使用try、catch包含所有的執行代碼以確保所有的異常都被處理。

?

??? 可將TPL Dataflow 做為進程內消息隊列,本文只是一個入門參考,更多復雜用法還是看官網, 你需要記住的是, 這是一個.Net 進程內數據流組件, 能讓你專注于流程。

?

作者:JulianHuang

感謝您的認真閱讀,如有問題請大膽斧正;覺得有用,請下方或加關注。

本文歡迎轉載,但請保留此段聲明,且在文章頁面明顯位置注明本文的作者及原文鏈接。

轉載于:https://www.cnblogs.com/JulianHuang/p/11177766.html

總結

以上是生活随笔為你收集整理的TPL Dataflow .Net 数据流组件,了解一下?的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。