日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

轻量级消息队列RedisQueue

發布時間:2023/12/4 数据库 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 轻量级消息队列RedisQueue 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

消息隊列(Message Queue)是分布式系統必不可少的中間件,大部分消息隊列產品(如RocketMQ/RabbitMQ/Kafka等)要求團隊有比較強的技術實力,不適用于中小團隊,并且對.NET技術的支持力度不夠。而Redis實現的輕量級消息隊列很簡單,僅有Redis常規操作,幾乎不需要開發團隊掌握額外的知識!

隨著強大的.NET5發布,.NET技術棧里面怎可沒有最佳的消息隊列搭檔?

本文從高性能Redis組件 NewLife.Redis 出發,借用快遞業務場景,講解.NET中如何使用Redis作為消息隊列,搭建企業級分布式系統架構!

什么是消息隊列

消息隊列就是消息在傳輸過程中保存消息的容器,其核心功用是削峰解耦

早高峰,快遞公司的貨車前來各驛站卸貨,多名站點工作人員使用PDA掃描到站,大量信息進入系統(1000tps),而通知快遞公司的接口只有400tps的處理能力。

通過增加MQ來保存消息,讓超過系統處理能力的消息滯留下來,等早高峰過后,系統即可完成處理。此為削峰

在快遞柜業務流程中,快遞員投柜后需要經歷扣減系統費、短信通知用戶和推送通知快遞公司三個業務動作。傳統做法需要依次執行這些業務東西,如果其中某一步異常(例如用戶手機未開機或者快遞公司接口故障),將會延遲甚至中斷整個投柜流程,嚴重影響用戶體驗。

如果接口層收到投柜數據后,寫入消息到MQ,后續三個子系統各自消費處理,將可以完美解決該問題,并且子系統故障不影響上游系統!此為解耦

內存消息隊列

最簡單的消息隊列,可以由阻塞集合BlockingCollection實現

public static void Start() {var queue = new BlockingCollection<Area>();// 獨立線程消費var thread = new Thread(s => Consume(queue));thread.Start();// 發布消息Public(queue); } private static void Public(BlockingCollection<Area> queue) {var area = new Area { Code = 110000, Name = "北京市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area = new Area { Code = 310000, Name = "上海市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area = new Area { Code = 440100, Name = "廣州市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000); } private static void Consume(BlockingCollection<Area> queue) {while (true){var msg = queue.Take();if (msg != null){XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);}} }

每秒鐘生產一個消息,都被獨立線程消費到。

Redis做消息隊列

Redis的LIST結構,具備左進右出的功能,再使用BRPOP的阻塞彈出,即可完成一個最基本的消息隊列 RedisQueue<T>。

GetQueue取得隊列后,Add方法發布消息。

TakeOne拉取消費一條消息,指定10秒阻塞,10秒內有消息立馬返回,否則等到10秒超時后返回空。

public static void Start(FullRedis redis) {var topic = "EasyQueue";// 獨立線程消費var thread = new Thread(s => Consume(redis, topic));thread.Start();// 發布消息Public(redis, topic); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" });Thread.Sleep(1000); } private static void Consume(FullRedis redis, String topic) {var queue = redis.GetQueue<Area>(topic);while (true){var msg = queue.TakeOne(10);if (msg != null){XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);}} }

LPUSH 生產消息(插入列表),BRPOP 消費消息(彈出列表),因此,消息被消費后就消失了!

從日志時間可以看到,生產與消費的時間差在1~3ms之間,延遲極低!

注釋消費代碼后重跑,可以在Redis中看到發布的消息

需要確認的隊列

如果通知快遞公司的物流推送子系統處理消息時出錯,消息丟失怎么辦?顯然不可能讓上游再發一次!

這里我們需要支持消費確認的可信隊列 RedisReliableQueue<T>。消費之后,除非程序主動確認消費,否則Redis不許刪除消息。

GetReliableQueue獲取隊列實例后,Add發布消息,TakeOneAsync異步消費一條消息,并指定10秒阻塞超時,處理完成后再通過Acknowledge確認。

public static void Start(FullRedis redis) {var topic = "AckQueue";// 獨立線程消費var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 發布消息Public(redis, topic);source.Cancel(); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetReliableQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" });Thread.Sleep(1000); } private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token) {var queue = redis.GetReliableQueue<String>(topic);while (!token.IsCancellationRequested){var mqMsg = await queue.TakeOneAsync(10);if (mqMsg != null){var msg = mqMsg.ToJsonEntity<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg);}} }

LPUSH 生產消息(插入列表),BRPOPLPUSH 消費消息(彈出列表并插入另一個Ack列表),這是確保不丟消息的關鍵。LREM 從Ack列表刪除,用于消費完成后確認。

如果消費異常,就不會執行該確認操作,滯留在Ack列表的消息,60秒后重新回來主列表。

腦筋急轉彎:如果應用進程異常退出,未確認的消息該怎么處理??

注釋消費代碼后重跑,可以在Redis中看到發布的消息,跟普通隊列一樣,使用了LIST結構

處理“北京市”消息時,如果沒有Acknowledge確認,Redis里面將會看到一個名為AckQueue:Ack:*的LIST結構,里面保存這這一條消息。所以,可信隊列本質上就是在消費時,同步把消息備份到另一個LIST里面,確認操作就是從待確認LIST里面刪除。

自從有了這個可信隊列,基本上足夠滿足90%以上業務需求。

延遲隊列

某一天,小馬哥說,快遞員投柜一定時間時候,如果用戶沒有來取件,那么系統需要收取超期取件費,需要一個延遲隊列。

于是想到了Redis的ZSET,我們再來一個 RedisDelayQueue<T>,Add生產消息時多了一個參數,指定若干秒后可以消費到該消息,消費用法跟可信隊列一樣。

public static void Start(FullRedis redis) {var topic = "DelayQueue";// 獨立線程消費var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 發布消息Public(redis, topic);source.Cancel(); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetDelayQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" }, 2);Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" }, 2);Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" }, 2);Thread.Sleep(1000); } private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token) {var queue = redis.GetDelayQueue<String>(topic);while (!token.IsCancellationRequested){var mqMsg = await queue.TakeOneAsync(10);if (mqMsg != null){var msg = mqMsg.ToJsonEntity<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg);}} }

上圖可以看到,每秒生產一個消息,2秒后消費到北京市,再過1秒消費到上海市(距離上海市的發布剛好2秒)。這里少了廣州市,因為測試程序在生產廣州市后,只等了1秒就退出。

我們從Redis中可以看到廣州市這一條消息,存放在ZSET結構中。

多消費組可重復消費的隊列

又一天,數據中臺的小伙伴想要消費訂單隊列,但是不能夠啊,LIST結構做的隊列,每個消息只能被消費一次,如果數據中臺的系統消費掉了,其它業務系統就會失去消息。

我們想到了Redis5.0開始新增的STREAM結構,再次封裝RedisStream。

public static void Start(FullRedis redis) {var topic = "FullQueue";var queue = redis.GetStream<String>(topic);// 獨立線程消費var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 發布消息Public(redis, topic);//source.Cancel(); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetStream<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" });Thread.Sleep(1000); } private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token) {var queue = redis.GetStream<String>(topic);queue.Group = "test";queue.GroupCreate(queue.Group);while (!token.IsCancellationRequested){try{var mqMsg = await queue.TakeMessageAsync(10);if (mqMsg != null){var msg = mqMsg.GetBody<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg.Id);}}catch (Exception ex){XTrace.WriteException(ex);}} }

生產過程不變,消費大循環有點特別,主要是STREAM消費回來的消息,有它自己的Id,只需要對這個Id確認就可以了。

上圖中,紅色框是生產,紫色框是消費。

再來看看Redis中,可以看到STREAM消息還在里面。數據中臺組只需要使用不同的消費組Group,即可獨立消費,不用擔心搶其它系統消息啦。

最佳實踐

RedisQueue在中通大數據分析中,用于緩沖等待寫入Oracle/MySql的數據,多線程計算后寫入隊列,然后由專門線程定時拉取一批(500行),執行批量Insert/Update操作。該系統隊列,每天10億條消息,Redis內存分配8G,實際使用小于100M,除非消費端故障導致產生積壓。

遞易智能科技全部使用可信隊列 RedisReliableQueue,約200多個隊列,按系統分布在各自的Redis實例,公有云2G內存主從版。積壓消息小于10萬時,隊列專用的Redis實例內存占用小于100M,幾乎不占內存空間。

公司業務每天帶來100萬多訂單,由此衍生的消息數約1000萬條,從未丟失消息!

例程代碼

代碼:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo

總結

以上是生活随笔為你收集整理的轻量级消息队列RedisQueue的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。