控制 Redis stream 的消息数量
控制 Redis stream 的消息數量
Intro
Redis Stream 是 Redis 5.0 引入的一個新的類型,之前我們介紹過使用 Redis Stream 來實現消息隊列,可以參考之前的文章 使用 Redis Stream 實現消息隊列,而 Stream 的消息會持久化地內存中,如果我們不控制消息數量的話,可能會出現大量的消息存在內存里導致過大的內存占用,Redis Stream 5.0 開始支持根據 Max Length 來控制 Stream 的長度(消息數量),從 6.2 開始支持根據消息 Id 來控制 Stream 的長度,默認地消息 Id 是一個時間戳,所以使用默認地 Id 也可以理解為按時間來控制 Stream 長度,下面我們來看使用示例吧
Redis 語法
控制 Stream 消息長度有兩個 Redis 命令,一個是?XTRIM?只做?Trim?操作,把不滿足要求的消息去除,另外一個是?XADD?在添加 Stream 消息的同時做?Trim?操作,簡化還要多一步?Trim?的操作,將添加消息和控制消息長度可以合并為一個操作
XTRIM?語法:
XTRIM?key?MAXLEN|MINID?[=|~]?threshold?[LIMIT?count]使用示例:
XTRIM?mystream?MAXLEN?1000 XTRIM?mystream?MINID?649085820XTRIM?mystream?MAXLEN?~?1000(Nearly?trim,不準確,可能有些消息該刪掉的會保留下來,但是執行效率會比?`=`(Exactly?Trim)?高一些) XTRIM?mystream?MINID?=?649085820Trimming the stream can be done using one of these strategies:
MAXLEN: Evicts entries as long as the stream's length exceeds the specified?threshold, where?threshold?is a positive integer.
MINID: Evicts entries with IDs lower than?threshold, where?threshold?is a stream ID.
XADD?語法:
XADD?key?[NOMKSTREAM]?[MAXLEN|MINID?[=|~]?threshold?[LIMIT?count]]?*|ID?field?value?[field?value?...]使用示例:
redis>?XADD?mystream?*?name?Sara?surname?OConnor "1631546114612-0" redis>?XADD?mystream?*?field1?value1?field2?value2?field3?value3 "1631546114612-1"redis>?XADD?mystream?MAXLEN?~?1000?field1?value1 redis>?XADD?mystream?MINID?~?1631546460687?field1?value1XADD?允許用戶在向 Stream 里添加消息的時候控制消息的長度返回值是消息ID,默認是一個時間戳,語法如上,可以 Trim 也可以 不Trim,可以根據需要選擇
Prepare
先來準備一些幫助類和公共方法,下面的示例是基于 StackExchange.Redis 來實現的
RedisHelper,獲取 Redis 連接
internal?static?class?RedisHelper {private?static?readonly?IConnectionMultiplexer?ConnectionMultiplexer?=?StackExchange.Redis.ConnectionMultiplexer.Connect("127.0.0.1:6379");public?static?IDatabase?GetRedisDb(int?dbIndex?=?0){return?ConnectionMultiplexer.GetDatabase(dbIndex);} }AddStreamMessage,向指定 stream 中添加若干條消息
private?static?async?Task?AddStreamMessage(string?key,?int?msgCount,?Action?action=null) {var?redis?=?RedisHelper.GetRedisDb();for?(var?i?=?0;?i?<?msgCount;?i++){await?redis.StreamAddAsync(key,?"messages",?$"val-{i}");action?.Invoke();} }Max-Length
根據 MaxLength 來控制 Stream 長度示例
var?streamKey?=?$"stream-{nameof(MaxLengthTrim)}"; await?AddStreamMessage(streamKey,?10); var?redis?=?RedisHelper.GetRedisDb(); Console.WriteLine(await?redis.StreamLengthAsync(streamKey));//?trim?directly await?redis.StreamTrimAsync(streamKey,?5); Console.WriteLine(await?redis.StreamLengthAsync(streamKey));//?add?with?trim await?redis.StreamAddAsync(streamKey,?StreamMessageField,?"Test",?maxLength:?3); Console.WriteLine(await?redis.StreamLengthAsync(streamKey));await?redis.KeyDeleteAsync(streamKey);輸出結果如下:
Min-ID
根據 Min-ID 來控制 Stream 消息長度,是 Redis 6.2 新引入的功能,目前 StackExchange.Redis 還沒有專門的 API 來支持這個功能,不過我們可以通過 Execute 來執行 Redis 命令,通常這些 Redis 客戶端庫都會支持直接調用 Redis 命令,根據 MinID 控制 Stream 長度示例如下:
private?const?string?StreamAddCommandName?=?"XADD"; private?const?string?StreamTrimCommandName?=?"XTRIM";private?const?string?StreamAddAutoMsgId?=?"*";private?const?string?StreamTrimByMinIdName?=?"MINID";private?const?string?StreamTrimOperator?=?"=";private?const?string?StreamMessageField?=?"message";private?static?async?Task?MinMsgIdTrim() {var?streamKey?=?$"stream-{nameof(MaxLengthTrim)}";await?AddStreamMessage(streamKey,?10,?()?=>?Thread.Sleep(1000));var?redis?=?RedisHelper.GetRedisDb();var?minId?=?DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(5)).ToUnixTimeMilliseconds();Console.WriteLine(await?redis.StreamLengthAsync(streamKey));//?https://redis.io/commands/xtrim//?trim?directlyawait?redis.ExecuteAsync(StreamTrimCommandName,?streamKey,StreamTrimByMinIdName,StreamTrimOperator,?//?optionalminId);Console.WriteLine(await?redis.StreamLengthAsync(streamKey));minId?=?DateTimeOffset.UtcNow.Subtract(TimeSpan.FromSeconds(2)).ToUnixTimeMilliseconds();//?https://redis.io/commands/xadd//?add?with?trimvar?result?=?redis.Execute(StreamAddCommandName,?streamKey,?StreamTrimByMinIdName,?StreamTrimOperator,?//?optionalminId,StreamAddAutoMsgId,?StreamMessageField,?"Test");Console.WriteLine(await?redis.StreamLengthAsync(streamKey));await?redis.KeyDeleteAsync(streamKey); }上述代碼輸出結果如下:
More
本文主要介紹了控制 Redis Stream 的消息長度,除了介紹 Redis 本身的命令之外,也是介紹一下如何使用 StackExchange.Redis 實現調用沒有 API 支持的 Redis 命令,Redis 6.2 之后支持了很多新的特性,但是很多庫都還太支持,了解如何原生調用 Redis 命令有些時候會很有幫助
References
https://redis.io/commands/xadd
https://redis.io/commands/xtrim
https://github.com/WeihanLi/SamplesInPractice/blob/master/RedisSample/StreamTrimSample.cs
總結
以上是生活随笔為你收集整理的控制 Redis stream 的消息数量的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: docker-compose 一键部署分
- 下一篇: 12 个问题搞懂 Redis