Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)
一.消息隊列
消息隊列:分布式系統必備的一個基礎軟件,能支持組件通信消息的快速讀寫
Redis本身支持數據的快速訪問,滿足消息隊列的讀寫性能需求
二.Redis適合做消息隊列嗎?
消息隊列的消息存取需求
消息隊列存取消息的過程
- 在分布式系統中,兩個組件要基于消息隊列進行通信,一個組件就會把要處理的數據以消息的形式傳遞給消息隊列,然后這個組件就可以繼續執行其他操作;
- 遠端的另一個組件從消息隊列中把消息讀取出來,在本地進行處理。
需求:
- 組件1需要對采集到的數據進行求和計算,并寫入數據庫;
- 消息到達速度很快,組件1沒有辦法及時既做采集又做計算,并寫入數據庫。
解決方案:
消息隊列:
- 組件1把數據x和y保存為JSON格式的消息,再把它發送到消息隊列,這樣就可以繼續接受新的數據。
- 組件2從消息隊列中 把數據讀取出來在服務器2上進行求和計算上,再寫入數據庫。
通用的消息隊列的架構模型:
? ? ??
消息隊列存取消息時候,必須要滿足的三個需求:
- ? ? 消息順序性?
- ? ? 消息冪等性
- ? ? 保證消息的可靠性
消息的順序性
? ? ?消息順序被消費者異步處理,但是消費者仍然按照生產者發送消息的順序來處理消息,避免后被發送的消息先被處理了。
? ? ?需求:對于消息順序性的場景來看,一旦出現消息亂序處理時,會導致業務邏輯被錯誤執行,給業務方造成損失。
重復消息處理
? ? ?消費者從 消息隊列讀取消息時,有時候會因為網絡堵塞出現消息重傳的情況。此時,消費者可能會收到多條重復消息。對于重復消息,消費者如果多次處理的話,可能造成一個業務邏輯被多次執行,如果業務邏輯正好要修改數據,就會出現數據被多次修改的問題。
消息可靠性
? ? ? 消費者在處理消息的時候,可能出現因為故障 或者宕機導致消息沒有處理完就丟失的情況。當消費者重啟時候,可以重新讀取消息再次進行處理,否則就會 出現消息漏處理的問題。
Redis如何實現消息隊列的需求
? ? ?基于List消息隊列解決方案
? ? ?List本身就是按照先進先出的順序對數據進行存取,所以如果使用List作為消息隊列保存 消息的話,就可以滿足消息的順序性。
? ? 生產者使用LPUSH命令要把發送的消息依次寫入list,消費者通過RPOP命令從LIST的另一端按照消息的寫入順序,依次讀取消息并處理。? ?
? ?存在問題:
? ? ?生產者往list寫入數據時,List并不會主動通知消費者有新消息寫入,如果消費者想要及時處理消息,就需要程序不斷調用RPOP命令(比如使用一個while(1)循環),如果新消息寫入,RPOP就會返回結果,否則,RPOP命令返回空值,再繼續循環。
? ? ?危害:
? ? ? ? 沒有新消息寫入LIST消費者也要不停的調用RPOP命令,這就會導致消費者程序cpu一直消耗在執行RPOP命令上,帶來不必要的性能損失。
? ? 解決:
? ? ? ? ?Redis提供了BRPOP命令。BRPOP命令,也稱為阻塞式讀取,客戶端在沒有讀取到隊列數據時,自動阻塞,知道有新的數據寫入隊列,再開始讀取新數據,和消費者程序在自己不停調用RPOP命令相比,這種方式能節省CPU開銷。
? ? ? ??
重復消息的處理:消息的冪等性
? ? ? ?消費者程序本身可以對重復消息進行判斷。
? ? ? 消息隊列要能給每個消息提供全局唯一的ID號;另一方面,消費者程序要把已經處理過的消息ID記錄下來。當收到一條消息后,消費者程序可以對比收到的消息ID和記錄處理過的消息ID。來判斷當前收到的消息有么有經過處理。
? ? ?如果已經處理 過了就不再處理了。這種處理特性被稱為消息 冪等性。
? ? ?冪等性:對于同一消息,消費者收到生成一次的處理結果和收到多次的處理結果是一致的。
不過List本身不會為每個消息生成ID號的,所以,消息的全局唯一ID號就需要生產者程序發送消息前自行生成,生成之后,我們在用LPUSH命令把消息插入List中,需要在消息中包含這個全局唯一ID。
消息可靠性:
? ? ? List 類型是如何保證消息可靠性---?備份
? ? ?背景:? 消費者List中讀取一條消息后,List就不會存留這條消息,所以如果消費者程序在處理消息的過程中出現了故障或者宕機,就會導致消息沒有處理完成,那么消費者程序再次啟動就會導致消息丟失。
? ? 解決方案:為了存留消息,list提供了BRPOPLUSH命令,這個命令的作用就是讓消費者從一個List中讀取消息,同時Redis會把這個消息再插入到另一個List(可以叫作備份 List)留存。
? ? ? 如果消費者程序讀取了消息但是沒能正常處理,等它重啟以后就可以從備份List中重新讀取消息并進行處理。
? ? ? 生產者消息發送很快,而消費者處理消息的速度緩慢,這就導致List中消息堆積的很多,給Redis內存帶來壓力。
? ? ?啟動多個消費者程序組成消費組,一起分擔處理 List中消息的消息。但是List類型并不支持消費組的實現。
基于Stream消息隊列解決方案
streams是Redis專門為消息隊列設計 的數據類型:
- XADD插入消息,保證有序,可以自動生成全局唯一ID;
- XREAD用于讀取消息,可以按ID讀取數據;
- XREADGROUP按消費組的形式讀取消息;
- XPENDING和XACK:?XPENDING查詢每個消費組內所有消費者已讀取但是尚未確認消息,ASCK命令用于向消息隊列確認消息處理已經完成。
XADD命令
可以往消息隊列中插入新消息,消息的格式 是鍵-值對形式。對于插入的每一條消息,Streams可以自動為其生成一個全局唯一ID。
XADD mqstream * repo 5 "1599203861727-0"可以往名稱為mqstream的消息隊列插入一條消息,消息的鍵為 repo, 值為5;
消息隊列中的* ,表示讓Redis為插入數據自動生成一個全局唯一的ID,例如"1599203861727-0"
也可以自行設定一個ID號,保證這個ID號是全局唯一的就行。不過使用*號會更加方便高效。
消息的全局唯一ID由兩部分組成
- ? ?第一部分"1599203861727"是指當前時間戳 毫秒級
- ? ?第二部分表示插入消息在當前毫秒內的消息序列,這是從0開始編號的,
- ? “1599203861727-0”就表示在“1599203861727”毫秒內的第 1 條消息。
XREAD 命令
? ? ? ?使用XREAD命令從消息隊列讀取
? ? ? ? XREAD在讀取消息時候,可以指定一個消息ID,并從這個消息ID的下一條消息開始進行讀取。例如我們可以執行下面的命令,從ID號為 1599203861727-0 的消息開始,讀取后續的所有消息:
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0 1) 1) "mqstream"2) 1) 1) "1599274912765-0"2) 1) "repo"2) "3"2) 1) "1599274925823-0"2) 1) "repo"2) "2"3) 1) "1599274927910-0"2) 1) "repo"2) "1"消息者也可以在調用XREAD時設定block配置項,實現類似于BRPOP的阻塞讀取操作。
當消息隊列中沒有消息時,一旦設置了block配置項,XREAD就會阻塞;
阻塞的時長可以在block配置項進行設置。
XREAD block 10000 streams mqstream $ (nil) (10.00s)? ? ? ?,命令最后的$符號表示讀取最新消息,同時設置block 10000配置項,1000的單位是毫秒,表示XREAD 在讀取最新消息時,如果沒有消息到來,XREAD 將阻塞 10000 毫秒(即 10 秒),然后再返回。上面命令中XREAD執行后,消息隊列命令中mqstream 中一直沒有消息XREAD 在 10 秒后返回空值(nil)。
? ? ?
消費組
? ? ??Stream本身可以使用XGROUP創建消費組,創建消費組后,Stream可以使用XREADGROUP命令讓消費組內的消費者讀取消息
? ? ??
XGROUP create mqstream group1 0 ok? ?我們再執行一段命令,讓GROUP1消費組中的消費者consumer1 從 mqstream 中讀取所有消息
XREADGROUP group group1 cinsumer1 streams mqstream > 1) 1) "mqstream"2) 1) 1) "1599203861727-0"2) 1) "repo"2) "5"2) 1) "1599274912765-0"2) 1) "repo"2) "3"3) 1) "1599274925823-0"2) 1) "repo"2) "2"4) 1) "1599274927910-0"2) 1) "repo"2) "1"讓group1消費組里的消費者consumer1從mqstream中讀取所有消息,
命令">"表示從第一天尚未被消費的消息開始讀取。
因為在consumer1讀取消息前,group1并沒有其他消費者讀取過消息,所以consumer1就得到了mqstream消息隊列中的所有消息。
消息隊列中的消息一旦被消費組里的一個消息讀取了,就不能再被該消費組內的其他消費者讀取。
我們繼續執行下面命令
XREADGROUP group group1 consumer2 streams mqstream 0 1) 1) "mqstream"2) (empty list or set)比如說,我們執行完剛才的 XREADGROUP 命令后,再執行下面的命令,讓 group1 內的 consumer2 讀取消息時,consumer2 讀到的就是空值,因為消息已經被 consumer1 讀取完了?
消費組的目的?
? ? 讓組內多個消費者共同分擔讀取消息,通常會讓每個消費者讀取部分消息,從而實現讓組內的多個消費者共同分擔讀取消息,實現消息讀取負載在多個消費者間是均衡分布的。例如,我們執行下列命令,讓 group2 中的 consumer1、2、3 各自讀取一條消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream > 1) 1) "mqstream"2) 1) 1) "1599203861727-0"2) 1) "repo"2) "5"XREADGROUP group group2 consumer2 count 1 streams mqstream > 1) 1) "mqstream"2) 1) 1) "1599274912765-0"2) 1) "repo"2) "3"XREADGROUP group group2 consumer3 count 1 streams mqstream > 1) 1) "mqstream"2) 1) 1) "1599274925823-0"2) 1) "repo"2) "2"保證消費者在發生故障或者宕機再次重啟時,讓可以讀取未處理完的消息,stream會自動使用內部隊列(PENDING List)留存消費組里 每個消費者讀取的消息;
直到消費者使用XACK命令通知Streams消息已經被處理完成。
如果消費者沒有成功處理消息,他就不會給Stream發送XACK命令,消息仍然會留存。
此時消費者可以在重啟后,用XPENDING 命令查看已讀取、但尚未確認處理完成的消息。
XPEBDING mqstream group2 1) (integer) 3 2) "1599203861727-0" 3) "1599274925823-0" 4) 1) 1) "consumer1"2) "1"2) 1) "consumer2"2) "1"3) 1) "consumer3"2) "1"查看group2中各個消費者已讀取,但是尚未確認的消息個數。其中,XPENDING返回結果的第二行第三行分別表示group2中所有消費者讀取的消息最小ID和最大ID。
XACK mqstream group2 1599274912765-0 (integer) 1 XPENDING mqstream group2 - + 10 consumer2 (empty list or set)consumer2 就可以使用 XACK 命令通知 Streams,然后這條消息就會被刪除。當我們再使用 XPENDING 命令查看時,就可以看到,consumer2 已經沒有已讀取、但尚未確認處理的消息了。
| 基于List | 基于Streams | |
| 消息順序性 | LPUSH/RPOP | XADD/XREAD |
| 阻塞讀取 | BRPOP | XREAD block |
| 重復消息處理 | 生產者自行實現全局唯一ID | Streams自動生成全局唯一ID |
| 消息可靠性 | BRPOPLPUSH | 使用PENDING List自動存留消息,使用XPENDING查看,使XACK確認 |
| 適用場景 | Redis 5.0前版本 部署環境消息總量小 | Redis 5.0以后版本 部署環境消息總量大,需要以消費組的形式讀取數據 |
總結
以上是生活随笔為你收集整理的Redis核心技术与实战-学习笔记(十五):消息队列(Redis的解决方案)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 52单片机IO口输出-蜂鸣器(硬核)
- 下一篇: 想进阿里必须啃透的 13 道 MySQL