日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > asp.net >内容正文

asp.net

如何利用.NETCore向Azure EventHubs准实时批量发送数据?

發布時間:2023/12/4 asp.net 80 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何利用.NETCore向Azure EventHubs准实时批量发送数据? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

最近在做一個基于Azure云的物聯網分析項目:.netcore采集程序向Azure事件中心(EventHubs)發送數據,通過Azure EventHubs Capture轉儲到Azure BlogStorage,供數據科學團隊分析。

為什么使用Azure事件中心?

Azure事件中心是一種Azure上完全托管的實時數據攝取服務, 每秒可流式傳輸來自website、app、device任何源的數百萬個事件。提供的統一流式處理平臺和時間保留緩沖區,將事件生成者和事件使用者分開。

  • 事件生成者:可使用https、AQMP協議發布事件

  • 分區:事件中心通過分區使用者模式提供消息流式處理功能,提高可用性和并行化

  • 事件接收者:所有事件中心使用者通過AMQP 1.0會話進行連接,讀取數據

例如,如果事件中心具有四個分區,并且其中一個分區要在負載均衡操作中從一臺服務器移動到另一臺服務器,則仍可以通過其他三個分區進行發送和接收。此外,具有更多分區可以讓更多并發讀取器處理數據,從而提高聚合吞吐量。了解分布式系統中分區和排序的意義是解決方案設計的重要方面。?為了幫助說明排序與可用性之間的權衡,請參閱 CAP 定理

最直觀的方式:請在portal.azure.cn門戶站點---->創建事件中心命名空間---> 創建事件中心

.NetCore 準實時批量發送數據到事件中心

.NET庫 (Azure.Messaging.EventHubs)

我們使用Asp.NetCore以Azure App Service形式部署,依賴Azure App Service的自動縮放能錄應對物聯網的潮汐大流量。

通常推薦批量發送到事件中心,能有效增加web服務的吞吐量和響應能力。
目前新版SDk:Azure.Messaging.EventHubs僅支持分批發送。

  • nuget上引入Azure.Messaging.EventHubs庫

  • EventHubProducerClient客戶端負責分批發送數據到事件中心,根據發送時指定的選項,事件數據可能會自動路由到可用分區或發送到特定請求的分區。

  • 在以下情況下,建議允許自動路由分區:
    1) 事件的發送必須高度可用
    2) 事件數據應在所有可用分區之間平均分配。
    自動路由分區的規則:
    1)使用循環法將事件平均分配到所有可用分區中
    2)如果某個分區不可用,事件中心將自動檢測到該分區并將消息轉發到另一個可用分區。

    我們要注意,根據選定的 命令空間定價層, 每批次發給事件中心的最大消息字節大小也不一樣:

    分段批量發送策略

    這里我們就需要思考:web程序收集數據是以個數為單位;但是我們分批發送時要根據分批的字節大小來切分。
    我的方案是:因引入TPL Dataflow 管道:

  • web程序收到數據,立刻丟入TransformBlock<string, EventData>

  • 轉換到EventData之后,使用BatchBlock<EventData>按照配置的個數打包

  • 利用ActionBlock<EventData[]>在包內 累積指定字節大小批量發送

    • 最后我們設置一個定時器(5min),強制在BatchBlock的前置隊列未滿時打包發送。

    核心的TPL Dataflow代碼如下:

    public?class?MsgBatchSender{private?readonly?EventHubProducerClient?Client;private?readonly?TransformBlock<string,?EventData>?_transformBlock;private?readonly?BatchBlock<EventData>?_packer;private?readonly?ActionBlock<EventData[]>?_batchSender;private?readonly?DataflowOption?_dataflowOption;private?readonly?Timer?_trigger;private?readonly?ILogger?_logger;public?MsgBatchSender(EventHubProducerClient?client,?IOptions<DataflowOption>?option,ILoggerFactory?loggerFactory){Client?=?client;_dataflowOption?=?option.Value;var?dfLinkoption?=?new?DataflowLinkOptions?{?PropagateCompletion?=?true?};_transformBlock?=?new?TransformBlock<string,?EventData>(text?=>?new?EventData(Encoding.UTF8.GetBytes(text)),new?ExecutionDataflowBlockOptions{MaxDegreeOfParallelism?=?_dataflowOption.MaxDegreeOfParallelism});_packer?=?new?BatchBlock<EventData>(_dataflowOption.BatchSize);_batchSender?=?new?ActionBlock<EventData[]>(msgs=>?BatchSendAsync(msgs));_packer.LinkTo(_batchSender,?dfLinkoption);_transformBlock.LinkTo(_packer,?dfLinkoption,?x?=>?x?!=?null);_trigger?=?new?Timer(_?=>?_packer.TriggerBatch(),?null,?TimeSpan.Zero,?TimeSpan.FromSeconds(_dataflowOption.TriggerInterval));_logger?=?loggerFactory.CreateLogger<DataTrackerMiddleware>();}private?async?Task?BatchSendAsync(EventData[]?msgs){try{if?(msgs?!=?null){var?i?=?0;while?(i?<?msgs.Length){var?batch?=?await?Client.CreateBatchAsync();while?(i?<?msgs.Length){if?(batch.TryAdd(msgs[i++])?==?false){break;}}if(batch!=?null?&&?batch.Count>0){await?Client.SendAsync(batch);batch.Dispose();}}}}catch?(Exception?ex){//?ignore?and?log?any?exception_logger.LogError(ex,?"SendEventsAsync:?{error}",?ex.Message);}}public??async?Task<bool>?PostMsgsync(string?txt){return?await?_transformBlock.SendAsync(txt);}public?async?Task?CompleteAsync(){_transformBlock.Complete();await?_transformBlock.Completion;await?_batchSender.Completion;await?_batchSender.Completion;}}

    總結

    • Azure事件中心的基礎用法

    • .NET Core準實時分批向Azure事件中心發送數據,其中用到的TPL Dataflow以actor模型:提供了粗粒度的數據流和流水線任務,提高了高并發程序的健壯性。?

      源碼地址:https://github.com/zaozaoniao/SaicEnergyTracker

    • TPL Dataflow組件應對高并發,低延遲要求

    • ASP.NET Core跨平臺技術內幕

    • AspNetCore結合Redis實踐消息隊列

    • Quartz.net在集群環境下部署任務的姿勢

    • 基于docker-compose的Gitlab CI/CD實踐&排坑指南

    總結

    以上是生活随笔為你收集整理的如何利用.NETCore向Azure EventHubs准实时批量发送数据?的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。