日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Beetlex.Redis之Stream功能详解

發布時間:2023/12/4 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Beetlex.Redis之Stream功能详解 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

有一段時間沒有寫文章,techempower的測試規則評分竟然發生了變化,只能忘著補充一下占比權重最多的數據更新示例了和深入設計一下組件模塊化加載的設計。但在不久前有用戶問了一下組件是否支持redis的Stream功能,看了一樣相關資料后把功能實現之;接下來就介紹一下如何用Beetlex.Redis來調用redis的Stream功能。

什么是Stream

是Redis5.0的Stream是一個新的強大的支持多播的可持久化的消息隊列,它提供了消息添加,多組和多消費者一致性讀取和ack確認等功能;更詳細的介紹就不多說了可以通過網絡找到更多詳細描述。

創建Stream

組件通過RedisDB對象的GetStream訪求來創建一個Stream訪問對象,對象創建后就可以進行一系列的 XACK| XADD| XDEL| XGROUP| XLEN| XRANGE| XREAD| XREADGROUP| XREVRANGE|XTRIM等指令操作。創建代碼如下:

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream");

XADD

在介紹這個操作前先說一下Stream里存儲的格式,默認Stream消息是K-V的格式,從基礎指令上可以了解到這種結構

XADD mystream * sensor-id 1234 temperature 19.8

但這種格式操作起來并不友好,所以組件除了支持這種K-V的方式外,還支持以對象的方式進行Stream消息處理。接下來看一下插入對象的調用

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var id = await stream.Add(DataHelper.Defalut.Employees[0]); id = await stream.Add(DataHelper.Defalut.Employees[1]); id = await stream.Add(DataHelper.Defalut.Employees[2]); var len = await stream.Len();

組件支持直接入插對象,其基礎指令就是

XADD employees_stream * date employeejson

組件直接采用一個K-V的方式來存儲對象,對于原則多個K-V的方式組件同樣也支持,只是在構建Stream指定類型用Dictionary<string,string>即可;接下其他就不多說了直接上指令用例了。

XLEN

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var len = await stream.Len();

XDEL

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var items = await stream.Read(null, null, "0-0"); await stream.Del((from item in items select item.ID).ToArray());

XRANGE

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var items = await stream.Range(); items = await stream.RangeAll();

XREVRANGE

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var items = await stream.RevRange(); items = await stream.RevRangeAll();

XREAD

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var items = await stream.Read(0, null, "0-0"); items = await stream.Read();

Stream的消費組

前面介紹的指令感覺列表結構都能滿足,其實Stream重要的功能是在組消費這一塊,Redis可以針對Stream創建多個消費組和消費者,而消息會做一致性消費處理。

XGROUP

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var group = await stream.GetGroup("henry");

XREAD

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var group = await stream.GetGroup("g1"); var items = await group.Read("henry", "0");

實際XRead提供了是否等待和起始讀已取參數

public async ValueTask<List<StreamDataItem<T>>> ReadWait(string consumer,int timeout=0public ValueTask<List<StreamDataItem<T>>> Read(string consumer,string start = null) public async ValueTask<List<StreamDataItem<T>>> Read(string consumer, int? block, int? count, string start = null)

一般情況下可以通過readwait來不停地消息新的消息

while(true) {items = await group.ReadWait("henry");//處理消息foreach(var item in items){await item.Ack();} }

XACK

RedisStream<Employee> stream = DB.GetStream<Employee>("employees_stream"); var group = await stream.GetGroup("g1"); var items = await group.Read("henry", "0"); foreach (var item in items)await item.Ack();

以上是BeetleX.Redis組件提供操作Stream的基礎指令,實際上Stream還有一些和運維相關的指令,只是這些在實際業務上用不上所以就沒有去實現了。

總結

以上是生活随笔為你收集整理的Beetlex.Redis之Stream功能详解的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。