日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

使用 Redis Stream 实现消息队列

發(fā)布時間:2023/12/4 数据库 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用 Redis Stream 实现消息队列 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

使用 Redis Stream 實現(xiàn)消息隊列

Intro

Redis 5.0 中增加了 Stream 的支持,利用 Stream 我們可以實現(xiàn)可靠的消息隊列,并且支持一個消息被多個消費者所消費,可以很好的實現(xiàn)消息隊列

Simple Usage

首先我們來看一個簡單版本的 Stream 使用,我們在代碼里使用一個發(fā)布者,一個消費者來模擬一個簡單的消息隊列的場景

來看下面的測試代碼:

private?const?string?StreamKey?=?"test-simple-stream";public?static?async?Task?MainTest() {await?RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);//?register?background?consumer_?=?Task.Factory.StartNew(Consume).ConfigureAwait(false);//await?Publish(); }private?static?async?Task?Publish() {Console.WriteLine("Press?Enter?to?publish?messages,?Press?Q?to?exit");var?input?=?Console.ReadLine();while?(input?is?not?"q"?and?not?"Q"){var?redis?=?RedisHelper.GetDatabase();for?(var?i?=?0;?i?<?10;?i++){await?redis.StreamAddAsync(StreamKey,?"message",?$"test_message_{i}");}input?=?Console.ReadLine();} }private?static?async?Task?Consume() {var?lastMsgId?=?"0-0";while?(true){await?InvokeHelper.TryInvokeAsync(async?()?=>{var?redis?=?RedisHelper.GetDatabase();var?entries?=?await?redis.StreamReadAsync(StreamKey,?lastMsgId,?2);if?(entries.Length?==?0){return;}foreach?(var?entry?in?entries){Console.WriteLine(entry.Id);entry.Values.Dump();//?delete?message?if?you?want//?redis.StreamDelete(StreamKey,?new[]?{?entry.Id?});}lastMsgId?=?entries[^1].Id;});await?Task.Delay(200);} }

上面的代碼會使用一個后臺線程來運行一個 Consumer 來從 Stream 中讀取消息,有兩種消費消息的模式,一種是自己維護(hù)一個處理的消息 offset,每次從這個 offset 之后讀取新消息,另外一種模式不需要維護(hù)本地的 offset,可以在處理完消息之后直接刪掉消息,默認(rèn)消息是不會刪消息的,所以如果不刪消息的話需要維護(hù)

Publisher 每次會發(fā)布 10 條消息,Consumer 每次會讀取兩條消息,處理之后會等待 200 ms,之后再查詢消息

來看一下運行效果吧:

Consumer Group

上面的示例會相對來說比較簡單,只有一個 Consumer,但是在比較常用的場景下往往會有多個消費者處理,

比如說用戶注冊成功之后,發(fā)布一條消息可能會有多個 Consumer 同時給用戶發(fā)郵件或短信以及給用戶加積分等操作,這種場景下使用上面的模式就不合適了,Redis Stream 中增加了 Consumer Group 的概念(有的人甚至稱 Redis 內(nèi)置了一個 Kafka),在創(chuàng)建了 Consumer Group 之后,向 Stream 發(fā)布消息的時候會廣播到各個 Consumer Group 中,每個 Consumer Group 的消息消費是獨立的,不同的 Consumer Group 的消費速度可以不一致,一個 Consumer Group 也可以有多個 Consumer 同時運行,同一個 Group 內(nèi)的多個 Consumer 是會共享一個 Consumer Group 的消息消費,而且我們可以手動進(jìn)行消息的 ACK

來看下面的示例代碼吧:

private?const?string?StreamKey?=?"test-stream-group"; private?static?int?_consumerCount;public?static?async?Task?MainTest() {await?RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);//?register?background?consumer_?=?await?Task.Factory.StartNew(Consume).ConfigureAwait(false);_?=?await?Task.Factory.StartNew(Consume).ConfigureAwait(false);//await?Publish(); }private?static?async?Task?Publish() {Console.WriteLine("Press?Enter?to?publish?messages,?Press?Q?to?exit");var?input?=?Console.ReadLine();while?(input?is?not?"q"?and?not?"Q"){var?redis?=?RedisHelper.GetDatabase();for?(var?i?=?0;?i?<?10;?i++){await?redis.StreamAddAsync(StreamKey,?"message",?$"test_message_{i}");}input?=?Console.ReadLine();} }private?static?async?Task?Consume() {Interlocked.Increment(ref?_consumerCount);var?groupName?=?$"group-{_consumerCount}";var?consumerName?=?$"consumer-{_consumerCount}";var?redis?=?RedisHelper.GetDatabase();redis.StreamCreateConsumerGroup(StreamKey,?groupName);while?(true){await?InvokeHelper.TryInvokeAsync(async?()?=>{var?messages?=?await?redis.StreamReadGroupAsync(StreamKey,?groupName,?consumerName,?count:?SecurityHelper.Random.Next(1,?4));if?(messages.Length?==?0){return;}foreach?(var?message?in?messages){Console.WriteLine($"{groupName}-{message.Id}-{message.Values.ToJson()}");await?redis.StreamAcknowledgeAsync(StreamKey,?groupName,?message.Id);}});await?Task.Delay(200);} }

上面的示例代碼會先注冊兩個 Consumer Group,兩個 Consumer Group 內(nèi)各有一個 consumer,你也可以使用多個 consumer,為了體現(xiàn)各個 Consumer Group 是獨立的,每次獲取消息的 Count 是會隨機(jī)指定的,在讀取的消息之后會輸出消息內(nèi)容來代替處理消息的邏輯,處理完成之后進(jìn)行消息的 ACK,消息的發(fā)布邏輯和上面的示例是類似的

上述代碼執(zhí)行輸出示例:


可以看到我們發(fā)布的消息,每一個 consumer group 都會處理消息,而且處理消息的速度是獨立的,互不影響

通過 XINFO 命令我們可以對 Stream 做一些監(jiān)控

More

利用 Redis 的 Stream 我們可以實現(xiàn)可靠的一個消息機(jī)制,stream 的每一條消息都會有一個消息 Id,默認(rèn)是兩個部分,一個部分是時間戳,另一個部分是一個序列號,消息 Id 可以自定義,但是通常情況下推薦用默認(rèn)的 id

Redis 中的 List、HashSet、Set、ZSet 這些數(shù)據(jù)類型中沒有元素的時候會把對應(yīng)的 Key 也會刪掉,但是 Stream 是不會的,Stream 允許沒有消息的時候依然存在

Redis Stream 使用的時候需要注意我們是可以指定 Stream 的消息長度的,如果我們指定了最大消息長度 10000,超出 10000 的時候舊消息就會被擠出隊列,可能會出現(xiàn)消息的丟失,需要對 Stream 做必要的監(jiān)控和報警

References

  • https://redis.io/topics/streams-intro

  • https://redis.io/commands

  • https://github.com/WeihanLi/SamplesInPractice/tree/master/RedisSample

總結(jié)

以上是生活随笔為你收集整理的使用 Redis Stream 实现消息队列的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。