Redis数据类型之stream类型
介紹
主要用于消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發布訂閱 (pub/sub) 來實現消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現網絡斷開、Redis 宕機等,消息就會被丟棄。
Stream 類型結構
一個消費組中每個消費者不會消費同一條信息。
常用命令
xadd
描述:在某個stream中追加消息
①是自己定義的key ;②是定義的ID,* 代表由redis自動生成,格式為:時間戳-index。index一般情況下為0,但當并發量大,同一時間新增數據量大則+1。
xlen
描述:返回結果為stream數據類型的長度
xrange
描述:獲取消息列表,會自動過濾已經刪除的消息
-表示最小值, +表示最大值
xread
我們可以在不定義消費組的情況下進行Stream消息的獨立消費,當Stream沒有新消息時,甚至可以阻塞等待。Redis設計了一個單獨的消費指令xread,可以將Stream當成普通的消息隊列(list)來使用。使用xread時,我們可以完全忽略消費組(Consumer Group)的存在,就好比Stream就是一個普通的列表(list)。
xgroup create
Stream通過xgroup create指令創建消費組(Consumer Group),需要傳遞起始消息ID參數用來初始化last_delivered_id變量。
xinfo
描述:獲取Stream信息
xreadgroup group
Stream提供了xreadgroup指令可以進行消費組的組內消費,需要提供消費組名稱、消費者名稱和起始消息ID。它同xread一樣,也可以阻塞等待新消息。讀到新消息后,對應的消息ID就會進入消費者的PEL(正在處理的消息)結構里,客戶端處理完畢后使用xack指令通知服務器,本條消息已經處理完畢,該消息ID就會從PEL中移除。
開始消費:
>號表示從當前消費組的last_delivered_id后面開始讀
每當消費者讀取一條消息,last_delivered_id變量就會前進
對比消費前后的消費組信息上圖nginx消費者消費了4條
xrange key - + 獲取消息列表
xpending
為了解決組內消息讀取但處理期間消費者崩潰帶來的消息丟失問題,STREAM 設計了 Pending 列表,用于記錄讀取但并未處理完畢的消息。命令XPENDIING 用來獲消費組或消費內消費者的未處理完畢的消息。
上面的結果我們可以看到,我們之前讀取的消息,都被記錄在Pending列表中,說明全部讀到的消息都沒有處理,僅僅是讀取了。那如何表示消費者處理完畢了消息呢?
使用命令 XACK 完成告知消息處理完成,演示如下:
xack
處理完畢后再次查看。待處理信息變成了2。
總結
以上是生活随笔為你收集整理的Redis数据类型之stream类型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: lg分屏软件支持linux吗,LG V1
- 下一篇: linux cmake编译源码,linu