计算发送延时与传播延迟_消息队列——延时消息应用解析及实践
前言
消息隊列服務相信大家一定都不陌生了,在很多應用系統中,都有一些場景會使用到消息隊列服務,簡單來說,我們可以把消息隊列比作是一個存放消息的容器,上游發送端將消息發送到消息隊列,下游消費端從消息隊列里消費消息。消息隊列是分布式系統中重要的組件,核心作用可以幫助我們實現異步、解耦以及削峰,從而提高系統性能和穩定性。
在大部分場景下業務系統如果只需要實現異步解耦、削峰填谷等能力,常規的普通消息就可以滿足此類需求。除此之外,在某些特殊的業務場景中,普通消息類型存在無法滿足需求的情況。這就需要消息隊列服務本身支持一些特殊的消息類型,或者開發者通過開發一些定制化的代碼實現目的。這里我們列舉在使用消息隊列過程中幾種特殊場景的例子:
順序消費場景
生產者按照一定的先后順序發布消息,消費者按照既定的先后順序消費消息,即先發布的消息一定會先被客戶端消費。
分布式事務場景
分布式架構下,隨著系統的演進,數據庫也進行了垂直拆分,如果選擇使用消息隊列進行上下游解耦的話,生產者和消費者需要保證數據一致性。
延時消費場景
生產者將消息發送到消息隊列后,并不期望立馬投遞這條消息,而是推遲到某個時間點之后將消息投遞給消費者進行消費。
對于順序消息和事務消息,這里就不進行詳細介紹了,大家有興趣可以自行研究,本文后續內容會和大家一起詳細討論下延時消息更多的細節及應用場景。
延時消息介紹
延時(定時)消息的特點就是發送者成功發送一條消息后,這條消息并不會馬上被消費者消費,而是在某個特定的時間或者延遲一段時間后,消息才被消費者可見并進行后續的消費,延時消息整個生命周期可以用如下示意圖來表示:
延時消息應用場景
交易場景
在生產者和消費者有時間窗口的要求下,我們可以考慮使用延時消息。如在電商交易場景下,交易中超時未支付的訂單需要被關閉的場景,在訂單創建時會發送一條延時消息。這條消息將會在30分鐘以后投遞給消費者,消費者收到此消息后,需要判斷對應的訂單是否已完成支付;如支付未完成,則關閉訂單。
游戲場景
再比如在游戲社區里,游戲運營方經常會發起一些活動,玩家在活動期間內按照規則完成一系列任務,活動時間截止后,游戲后臺根據玩家完成任務的情況進行判定,發送系統通知或者進行rank排名并派發獎勵等。
此種場景也可以采用延時消息來實現,上游系統發布活動公告后,同時發送一條延時消息,延時時間設置為活動周期的時間。當活動截止后,下游系統可以隨即消費消息并進行相應的邏輯處理。
其他場景
同時延時消息也可以廣泛應用于信息提醒等比較通用的場景。
如何實現延時消息
介紹完延時消息的一些概念及應用場景后,我們接下來分析一下目前比較主流的幾款開源消息中間件對延時消息的支持情況以及實現方式。
Kafka
原生Kafka默認是不支持延時消息的,需要開發者自己實現一層代理服務,比如發送端將消息發送到延時Topic,代理服務消費延時Topic的消息然后轉存起來,代理服務通過一定的算法,計算延時消息所附帶的延時時間是否到達,然后將延時消息取出來并發送到實際的Topic里面,消費端從實際的Topic里面進行消費。
RabbitMQ
RabbitMQ實現延時消息有兩種方案,第一種是采用rabbitmq-delayed-message-exchange 插件實現,第二種則是利用DLX(Dead Letter Exchanges)+ TTL(消息存活時間)來間接實現。大致的實現思路如下:
RocketMQ
開源RocketMQ支持延遲消息,但是不支持秒級精度。默認支持18個level的延遲消息,這是通過broker端的messageDelayLevel配置項確定的messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
消息隊列服務在啟動時,會創建一個內部topic:SCHEDULE_TOPIC_XXXX,根據延遲level的個數,創建對應數量的隊列。生產者發送消息時可以設置延時等級,示例代碼:
Message msg=new Message(); msg.setTopic("TopicA"); msg.setBody("this is a delay message".getBytes()); //設置延遲level為5,對應延遲1分鐘 msg.setDelayTimeLevel(5); producer.send(msg);發送的消息會暫存在Broker對應的內部topic中,再通過定時任務從內部topic中拉取數據,如果延遲時間到了,就會把消息轉發到目標topic下,消費者從目標topic消費消息。
阿里云消息隊列RocketMQ版
通過上一章節的討論,我們可以看出目前幾款主流的開源消息隊列服務,在支持延時消息的場景下或多或少有些不完美的地方。主要體現在以下幾點:
那么有沒有一款消息隊列服務,能夠完美的支持延時(定時)消息。本節我們將介紹阿里云消息隊列RocketMQ版。
阿里云消息隊列RocketMQ版基于Apache RocketMQ構建的低延遲、高并發、高可用、高可靠的分布式消息中間件。消息隊列RocketMQ版既可為分布式應用系統提供異步解耦和削峰填谷的能力,同時也具備互聯網應用所需的海量消息堆積、高吞吐、可靠重試等特性。同時支持豐富的消息類型包括普通消息、順序消息、事務消息以及我們本文討論的延時消息。接下來我們看下阿里云RocketMQ為延時消息提供的能力及優勢:
使用阿里云消息隊列RocketMQ版收發延時(定時)消息,只需要在控制臺創建Topic的時候選擇定時/延時消息類型,既可以使用TCP或者http協議進行消息收發。
控制臺創建定時/延時Topic
Java語言示例代碼(TCP協議)
- 發送定時消息
- 發送延時消息
同時訂閱延時消息的邏輯無需任何改造,完全可以按照訂閱普通消息的方式,沒有任何的代碼侵入性。
結束語
到此我們討論了延時消息的特性、應用場景,對比了各類消息隊列對延時消息的支持情況,同時也向大家介紹了阿里云消息隊列RocketMQ版。我們在對消息中間件進行選型時,也會考慮到多方面的因素。除了消息中間件本身所能提供的能力外,也包括服務性能、穩定性、可擴展能力,以及需要結合開發團隊自身的技術棧等情況。最后如果大家想了解更多阿里云消息隊列RocketMQ版。
作者:阿里云解決方案架構師 鹿玄
原文鏈接
本文為阿里云原創內容,未經允許不得轉載
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的计算发送延时与传播延迟_消息队列——延时消息应用解析及实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: eas报错日记_金蝶EAS抓取性能日志说
- 下一篇: 天然气表怎么看多少方_宝宝奶粉的的营养成