日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Redis实现消息队列之生产消费模式

發(fā)布時間:2025/3/17 数据库 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis实现消息队列之生产消费模式 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

簡單的介紹下消息隊列,使用消息隊列首先我們得有一個隊列,那么這個隊列之前講過就是先進先出的一個數(shù)據(jù)結(jié)構(gòu);那么有了隊列以后我們還需要有人在隊列里面放東西,那么這個放東西的人我們稱之為生產(chǎn)者;有了生產(chǎn)者對應(yīng)的需要一個消費者,沒有消費者這個隊列滿了就會溢出。

簡單隊列實現(xiàn)

那么我們Redis剛好有一個數(shù)據(jù)類型符合這個就是List。list可以實現(xiàn)隊列(先進先出)和棧(先進后出),那么這個list又有兩種插入數(shù)據(jù)的方式:頭插法和尾插法。所以我們今天使用的結(jié)構(gòu)是隊列,使用尾插法,關(guān)鍵的命令有rpush(尾插)和lpop(頭部獲取)。如下圖所示:

>?rpush?squeue?zhangsan?lisi?mango????#插入隊列(integer) 3>?lpop?squeue????#獲取隊列"zhangsan">?rpush?squeue?wangwu(integer) 3> lpop squeue"lisi"> lpop squeue"mango"> lpop squeue"wangwu">?lpop?squeue????#空隊列(nil)

上面是rpush/lpop結(jié)合使用的例子。還可以使用lpush/rpop結(jié)合使用,效果是一樣的。

注意:我們這里思考一個問題,在咱們實際開發(fā)中我們獲取隊列數(shù)據(jù)的時候如果這個隊列里面沒有任何值了,我們會一直pop,這樣我們的程序出現(xiàn)了一個死循環(huán),而且此時的redis會不斷的處理服務(wù)器的pop指令使之內(nèi)存增高。

此時mango靈光一閃,每次pop的時候判斷,如果隊列有值我們獲取這個值進行操作,如果隊列是空隊列,那么我們此時休息三秒鐘再請求,emmm,加雞腿。

>?lpop?squeue????#空隊列(nil).......sleep?3s?old> lpop squeue...

隊列阻塞

通過后端程序控制redis服務(wù)器休息時間是一個好辦法,但是此處有一個問題,如果說服務(wù)器在休眠的時候隊列突然進來一個值,而此時需要及時反應(yīng)獲取這個值該如何實現(xiàn)呢?

當(dāng)然,redis早就考慮到這個問題,so提供了一個叫做隊列阻塞讀,其命令blpop和brpop,就是lpop和rpop的阻塞讀方法

blpop [第一個參數(shù):key]... [第二個參數(shù):time]key可以有多個key等待time就是阻塞時間,單位默認為秒

嗯,看似完美的解決了上面的方法

問題又來了,如果說此處設(shè)置的時間稍微長一點,阻塞請求的客戶端連接再多一點那么會出現(xiàn)下一個問題,那就是空閑連接問題。如果線程一直阻塞在哪里,Redis的客戶端連接就成了閑置連接,閑置過久,服務(wù)器一般會主動斷開連接,減少閑置資源占用。這個時候blpop/brpop會拋出異常來。

所以,魚與熊掌不可兼得,開發(fā)者當(dāng)注意此處需要捕獲異常,然后重新請求。

延遲隊列

上面我們介紹了阻塞隊列,深知空閑連接會被回收出現(xiàn)異常問題,那么我們可不可以實現(xiàn)延遲隊列,我提前一段時間比如5秒獲取隊列中的元素,當(dāng)元素記錄的時間到達了我再去執(zhí)行這個值,然后又提前5秒時間去獲取隊列中的值,依次反復(fù)。即可保證我在某一時刻去執(zhí)行隊列中對應(yīng)時間的值。

我們來分析,首先保證隊列是一個有序的我們才能依次執(zhí)行,這里我們使用ZSet因為它帶有排序且不重復(fù),保證客戶端沒有提交重復(fù)數(shù)據(jù),那么值保證了,這個排序如何設(shè)計呢?我們不能使用'yyyyMMddHHmmssSSS'這種,第一個不適應(yīng)這個排序類型可能會超出,第二就是每次轉(zhuǎn)換會帶來運算轉(zhuǎn)換的消耗,所有這里我們使用時間戳。那么zset提供一個獲取某段存在數(shù)據(jù)指令zrangebyscore,這個指令能夠獲取到我們想要的數(shù)據(jù),然后通過zrem刪除zset里面的值即可完成消費。

####假設(shè)當(dāng)前時間戳是0>?zadd?dqueue?4?zhangsan?8?lisi?12?mango????#添加元素(integer) 3####提前獲取后5秒的數(shù)據(jù)>?zrangebyscore?dqueue?-inf?5?withscores????#提前獲取5秒內(nèi)的數(shù)據(jù)1)?"zhangsan"????#值2)?4.0???????????#當(dāng)前排序索引>?zrem?dqueue?zhangsan?????#消費后刪除元素1####當(dāng)前時間戳來到了5秒,提前5秒獲取就加上這個>?zrangebyscore?dqueue?-inf?10?withscores????#獲取10以內(nèi)的數(shù)據(jù),輪訓(xùn)調(diào)用1) "lisi"2) 8.0

寫在最后

我們首先實現(xiàn)簡單的生成消費模式,針對這種簡單輪詢我們通過有數(shù)據(jù)和無數(shù)據(jù)讓這個輪詢實現(xiàn)休息策略來優(yōu)化,需求更改需要及時響應(yīng)生產(chǎn)者的數(shù)據(jù)我們使用了阻塞隊列來減少響應(yīng)延時,通過分析我們又發(fā)現(xiàn)阻塞隊列會被回收的問題,我們又雙叒叕進行了優(yōu)化,通過zset來實現(xiàn)延遲隊列。

你以為這樣就萬無一失了,其實還存在一個問題那就是不能保證原子性,我們的zrangebyscore和zrem不能在同一原子內(nèi)執(zhí)行,這里只是針對的一個客戶端,如果有多個客戶端執(zhí)行那就會出現(xiàn)多次消費問題,也就是資源爭搶,這里我們可以使用lua腳本來執(zhí)行保證原子性,即獲取后刪除,其值和時間戳保存到內(nèi)存中,通過后端程序控制時間消費。當(dāng)然,問題還會有的,我們在下一篇來講解Redis消息隊列的發(fā)布訂閱模式。

?

一名正在搶救的coder

筆名:mangolove

CSDN地址:https://blog.csdn.net/mango_love

GitHub地址:https://github.com/mangoloveYu

總結(jié)

以上是生活随笔為你收集整理的Redis实现消息队列之生产消费模式的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。