日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

AspNetCore结合Redis实践消息队列

發布時間:2023/12/4 数据库 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 AspNetCore结合Redis实践消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

這是年中首發在博客園上的文章,個人覺得是AspNetCore結合Redis做的一次比較優秀的消息隊列重構,其中對于點對點/發布-訂閱的思路應該也是面試必考題。

引言

  .Net?TPL?Dataflow是一個進程內數據流管道,應對高并發、低延遲的要求非常有效, 但在實際Docker部署的過程中, 有一個問題一直無法回避:

單體程序部署的瞬間(服務不可用)會有少量流量無法處理;

更糟糕的情況下,迭代部署的這個版本有問題,上線后無法工作, 導致更多流量沒有處理。

????背負神圣使命(巨大壓力)的程序猿心生一計,為何不將單體程序改成分布式:

增加服務ReceiverApp,ReceiverApp只接受數據,WebApp只處理數據。

知識儲備

? ? 消息隊列和訂閱發布作為老生常談的兩個知識點被反復提及,按照JMS的規范, 官方稱為點對點(point to point, queue)和發布/訂閱(publish/subscribe,topic)

點對點

????生產者發送消息到Message Queue中,然后消費者從隊列中取出消息并消費。

隊列會保留消息,直到他們被消費或超時;?

①?MQ支持多消費者,但每個消息只能被一個消費者處理

②?發送者和消費者在時間上沒有依賴性,當發送者發送消息之后,不管消費者有沒有在運行(甚至不管有沒有消費者),都不會影響到消息被發送到隊列

③ 一般消費者在消費之后需要向隊列應答成功

如果你希望發送的每個消息都應該被成功處理,你應該使用p2p模型

發布/訂閱

  消息生產者將消息發布到Channel,在此之前已有多個消費者訂閱該通道。

和點對點方式不同,發布到特定通道的消息會被通道訂閱者收到。

通道沒有暫存隊列機制,發布的消息只能被當前收聽的訂閱者接收到

① 每個消息可以有多個訂閱者

② 發布者和消費者有時間上依賴性:某通道的訂閱者,必須先創建該通道訂閱,才能收到消息

發布消息至通道,不關注訂閱者是誰;訂閱者可收聽自己感興趣的多個通道(類似于topic),也不關注發布者是誰。

③ 故如果沒有訂閱者,發布的消息將得不到處理;

頭腦風暴

Redis內置的List數據結構能形成輕量級消息隊列的效果;Redis原生支持發布/訂閱?模型

如上分析, Pub/Sub模型在訂閱者宕機的時候,發布的消息得不到處理,故此模型不能用于強業務的數據接收和處理。

本次采用的消息隊列模型:

  • 解耦業務:新建ReceiverApp作為生產者,專注于接收并發送到隊列;原有的WebApp作為消費者專注數據處理。

  • 起到削峰填谷的作用,若縮放出多個WebApp消費者容器,還能形成負載均衡的效果。?

需要關注Redis操作List結構的兩個命令( 左進右出,右進左出同理):

? ? LPUSH? &? RPOP/BRPOP

Brpop中的B 表示"Block",是一個rpop命令的阻塞版本:若指定List沒有新元素,在給定時間內,該命令會阻塞當前redis客戶端連接,直到超時返回nil

AspNetCore編程實踐

本次使用AspNetCore 完成RedisMQ的實踐,引入Redis國產第三方開源庫CSRedisCore

生產者ReceiverApp

生產者使用LPush命令向Redis List數據結構寫入消息。

------------------截取自Startup.cs------------------------- public void ConfigureServices(IServiceCollection services) {// Redis客戶端要定義成單例, 不然在大流量并發收數的時候, 會造成redis client來不及釋放。另一方面也確認api控制器不是單例模式,var csredis = new CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");RedisHelper.Initialization(csredis);services.AddSingleton(csredis);services.AddMvc(); } ------------------截取自數據接收Controller------------------- [Route("batch")] [HttpPost] public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair> eqidPairs) {if (!ModelState.IsValid)throw new ArgumentException("Http Body Payload Error.");var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}"; eqidPairs = await EqidExtractor.EqidExtractAsync(eqidPairs);if (eqidPairs != null && eqidPairs.Any())RedisHelper.LPush(redisKey, eqidPairs.ToArray());await Task.CompletedTask;}

消費者WebApp

????根據以上RedisMQ思路,事件消費方式是拉取pull,故需要輪詢Redis? List數據結構,這里使用AspNetCore內置的BackgroundService后臺服務類后臺輪詢消費:

關注后臺Job中的循環接收方法。

public class BackgroundJob : BackgroundService {private?readonly?IEqidPairHandler?_eqidPairHandler;private readonly CSRedisClient[] _cSRedisClients;private readonly IConfiguration _conf;private readonly ILogger _logger;public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[] csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory){_eqidPairHandler = eqidPairHandler;_cSRedisClients = csRedisClients;_conf = conf;_logger = loggerFactory.CreateLogger(nameof(BackgroundJob));}protected override async Task ExecuteAsync(CancellationToken stoppingToken){_logger.LogInformation("Service starting");if (_cSRedisClients[0] == null){_cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + ",defaultDatabase=" + 0);}RedisHelper.Initialization(_cSRedisClients[0]);while (!stoppingToken.IsCancellationRequested){var key = $"eqidpair:{DateTime.Now.ToString("yyyyMMdd")}";var?eqidpair?=?RedisHelper.BRPop(5,?key);if (eqidpair != null)await _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>(eqidpair));//?強烈建議無論如何休眠一段時間,防止突發大流量導致WebApp進程CPU滿載,自行根據場景設置合理休眠時間await Task.Delay(10, stoppingToken);}_logger.LogInformation("Service stopping");} }

迭代驗證

使用docker-compose單機部署Nginx,ReceiverApp,WebApp容器。

docker-compose up指令默認只會重建[Service配置或Image變更]的容器。

If there are existing containers for a service, and the service’s configuration or image was changed after the container’s creation,?docker-compose up?picks up the changes by stopping and recreating the containers (preserving mounted volumes). To prevent Compose from picking up changes, use the?--no-recreate?flag.

做一次迭代驗證,更新docke-compose.yml文件WebApp服務的鏡像版本,

docker-compose up;

下圖顯示僅 數據處理容器 WebApp被Recreate:

Nice,分布式改造完成,效果很明顯,現在可以放心安全的迭代核心WebApp數據處理程序。

+?https://redis.io/commands/brpop

+?https://redis.io/commands/lpush

文字+制圖,均為原創,

掃碼點贊,
讓干貨飛一會。
............

往期推薦??

TPL Dataflow組件應對高并發,低延遲要求

docker stack,docker-compose前世今生

點贊的朋友年后老板加雞腿!

總結

以上是生活随笔為你收集整理的AspNetCore结合Redis实践消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。