轻量级消息队列RedisQueue
消息隊列(Message Queue)是分布式系統必不可少的中間件,大部分消息隊列產品(如RocketMQ/RabbitMQ/Kafka等)要求團隊有比較強的技術實力,不適用于中小團隊,并且對.NET技術的支持力度不夠。而Redis實現的輕量級消息隊列很簡單,僅有Redis常規操作,幾乎不需要開發團隊掌握額外的知識!
隨著強大的.NET5發布,.NET技術棧里面怎可沒有最佳的消息隊列搭檔?
本文從高性能Redis組件 NewLife.Redis 出發,借用快遞業務場景,講解.NET中如何使用Redis作為消息隊列,搭建企業級分布式系統架構!
什么是消息隊列
消息隊列就是消息在傳輸過程中保存消息的容器,其核心功用是削峰和解耦!
早高峰,快遞公司的貨車前來各驛站卸貨,多名站點工作人員使用PDA掃描到站,大量信息進入系統(1000tps),而通知快遞公司的接口只有400tps的處理能力。
通過增加MQ來保存消息,讓超過系統處理能力的消息滯留下來,等早高峰過后,系統即可完成處理。此為削峰!
在快遞柜業務流程中,快遞員投柜后需要經歷扣減系統費、短信通知用戶和推送通知快遞公司三個業務動作。傳統做法需要依次執行這些業務東西,如果其中某一步異常(例如用戶手機未開機或者快遞公司接口故障),將會延遲甚至中斷整個投柜流程,嚴重影響用戶體驗。
如果接口層收到投柜數據后,寫入消息到MQ,后續三個子系統各自消費處理,將可以完美解決該問題,并且子系統故障不影響上游系統!此為解耦!
內存消息隊列
最簡單的消息隊列,可以由阻塞集合BlockingCollection實現
public static void Start() {var queue = new BlockingCollection<Area>();// 獨立線程消費var thread = new Thread(s => Consume(queue));thread.Start();// 發布消息Public(queue); } private static void Public(BlockingCollection<Area> queue) {var area = new Area { Code = 110000, Name = "北京市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area = new Area { Code = 310000, Name = "上海市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000);area = new Area { Code = 440100, Name = "廣州市" };XTrace.WriteLine("Public {0} {1}", area.Code, area.Name);queue.Add(area);Thread.Sleep(1000); } private static void Consume(BlockingCollection<Area> queue) {while (true){var msg = queue.Take();if (msg != null){XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);}} }每秒鐘生產一個消息,都被獨立線程消費到。
Redis做消息隊列
Redis的LIST結構,具備左進右出的功能,再使用BRPOP的阻塞彈出,即可完成一個最基本的消息隊列 RedisQueue<T>。
GetQueue取得隊列后,Add方法發布消息。
TakeOne拉取消費一條消息,指定10秒阻塞,10秒內有消息立馬返回,否則等到10秒超時后返回空。
public static void Start(FullRedis redis) {var topic = "EasyQueue";// 獨立線程消費var thread = new Thread(s => Consume(redis, topic));thread.Start();// 發布消息Public(redis, topic); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" });Thread.Sleep(1000); } private static void Consume(FullRedis redis, String topic) {var queue = redis.GetQueue<Area>(topic);while (true){var msg = queue.TakeOne(10);if (msg != null){XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);}} }LPUSH 生產消息(插入列表),BRPOP 消費消息(彈出列表),因此,消息被消費后就消失了!
從日志時間可以看到,生產與消費的時間差在1~3ms之間,延遲極低!
注釋消費代碼后重跑,可以在Redis中看到發布的消息
需要確認的隊列
如果通知快遞公司的物流推送子系統處理消息時出錯,消息丟失怎么辦?顯然不可能讓上游再發一次!
這里我們需要支持消費確認的可信隊列 RedisReliableQueue<T>。消費之后,除非程序主動確認消費,否則Redis不許刪除消息。
GetReliableQueue獲取隊列實例后,Add發布消息,TakeOneAsync異步消費一條消息,并指定10秒阻塞超時,處理完成后再通過Acknowledge確認。
public static void Start(FullRedis redis) {var topic = "AckQueue";// 獨立線程消費var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 發布消息Public(redis, topic);source.Cancel(); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetReliableQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" });Thread.Sleep(1000); } private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token) {var queue = redis.GetReliableQueue<String>(topic);while (!token.IsCancellationRequested){var mqMsg = await queue.TakeOneAsync(10);if (mqMsg != null){var msg = mqMsg.ToJsonEntity<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg);}} }LPUSH 生產消息(插入列表),BRPOPLPUSH 消費消息(彈出列表并插入另一個Ack列表),這是確保不丟消息的關鍵。LREM 從Ack列表刪除,用于消費完成后確認。
如果消費異常,就不會執行該確認操作,滯留在Ack列表的消息,60秒后重新回來主列表。
腦筋急轉彎:如果應用進程異常退出,未確認的消息該怎么處理??
注釋消費代碼后重跑,可以在Redis中看到發布的消息,跟普通隊列一樣,使用了LIST結構
處理“北京市”消息時,如果沒有Acknowledge確認,Redis里面將會看到一個名為AckQueue:Ack:*的LIST結構,里面保存這這一條消息。所以,可信隊列本質上就是在消費時,同步把消息備份到另一個LIST里面,確認操作就是從待確認LIST里面刪除。
自從有了這個可信隊列,基本上足夠滿足90%以上業務需求。
延遲隊列
某一天,小馬哥說,快遞員投柜一定時間時候,如果用戶沒有來取件,那么系統需要收取超期取件費,需要一個延遲隊列。
于是想到了Redis的ZSET,我們再來一個 RedisDelayQueue<T>,Add生產消息時多了一個參數,指定若干秒后可以消費到該消息,消費用法跟可信隊列一樣。
public static void Start(FullRedis redis) {var topic = "DelayQueue";// 獨立線程消費var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 發布消息Public(redis, topic);source.Cancel(); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetDelayQueue<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" }, 2);Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" }, 2);Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" }, 2);Thread.Sleep(1000); } private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token) {var queue = redis.GetDelayQueue<String>(topic);while (!token.IsCancellationRequested){var mqMsg = await queue.TakeOneAsync(10);if (mqMsg != null){var msg = mqMsg.ToJsonEntity<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg);}} }上圖可以看到,每秒生產一個消息,2秒后消費到北京市,再過1秒消費到上海市(距離上海市的發布剛好2秒)。這里少了廣州市,因為測試程序在生產廣州市后,只等了1秒就退出。
我們從Redis中可以看到廣州市這一條消息,存放在ZSET結構中。
多消費組可重復消費的隊列
又一天,數據中臺的小伙伴想要消費訂單隊列,但是不能夠啊,LIST結構做的隊列,每個消息只能被消費一次,如果數據中臺的系統消費掉了,其它業務系統就會失去消息。
我們想到了Redis5.0開始新增的STREAM結構,再次封裝RedisStream。
public static void Start(FullRedis redis) {var topic = "FullQueue";var queue = redis.GetStream<String>(topic);// 獨立線程消費var source = new CancellationTokenSource();Task.Run(() => ConsumeAsync(redis, topic, source.Token));// 發布消息Public(redis, topic);//source.Cancel(); } private static void Public(FullRedis redis, String topic) {var queue = redis.GetStream<Area>(topic);queue.Add(new Area { Code = 110000, Name = "北京市" });Thread.Sleep(1000);queue.Add(new Area { Code = 310000, Name = "上海市" });Thread.Sleep(1000);queue.Add(new Area { Code = 440100, Name = "廣州市" });Thread.Sleep(1000); } private static async Task ConsumeAsync(FullRedis redis, String topic, CancellationToken token) {var queue = redis.GetStream<String>(topic);queue.Group = "test";queue.GroupCreate(queue.Group);while (!token.IsCancellationRequested){try{var mqMsg = await queue.TakeMessageAsync(10);if (mqMsg != null){var msg = mqMsg.GetBody<Area>();XTrace.WriteLine("Consume {0} {1}", msg.Code, msg.Name);queue.Acknowledge(mqMsg.Id);}}catch (Exception ex){XTrace.WriteException(ex);}} }生產過程不變,消費大循環有點特別,主要是STREAM消費回來的消息,有它自己的Id,只需要對這個Id確認就可以了。
上圖中,紅色框是生產,紫色框是消費。
再來看看Redis中,可以看到STREAM消息還在里面。數據中臺組只需要使用不同的消費組Group,即可獨立消費,不用擔心搶其它系統消息啦。
最佳實踐
RedisQueue在中通大數據分析中,用于緩沖等待寫入Oracle/MySql的數據,多線程計算后寫入隊列,然后由專門線程定時拉取一批(500行),執行批量Insert/Update操作。該系統隊列,每天10億條消息,Redis內存分配8G,實際使用小于100M,除非消費端故障導致產生積壓。
遞易智能科技全部使用可信隊列 RedisReliableQueue,約200多個隊列,按系統分布在各自的Redis實例,公有云2G內存主從版。積壓消息小于10萬時,隊列專用的Redis實例內存占用小于100M,幾乎不占內存空間。
公司業務每天帶來100萬多訂單,由此衍生的消息數約1000萬條,從未丟失消息!
例程代碼
代碼:https://github.com/NewLifeX/NewLife.Redis/tree/master/QueueDemo
總結
以上是生活随笔為你收集整理的轻量级消息队列RedisQueue的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2020,你收获了什么?又失去了什么?
- 下一篇: 对 Redis 中的有序集合Sorted