日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) >

Redis 使用 List 实现消息队列的利与弊

發(fā)布時(shí)間:2024/8/23 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Redis 使用 List 实现消息队列的利与弊 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

作者 | 碼哥字節(jié)

來(lái)源 |?碼哥字節(jié)

分布式系統(tǒng)中必備的一個(gè)中間件就是消息隊(duì)列,通過(guò)消息隊(duì)列我們能對(duì)服務(wù)間進(jìn)行異步解耦、流量消峰、實(shí)現(xiàn)最終一致性。

目前市面上已經(jīng)有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人會(huì)問(wèn):“Redis 適合做消息隊(duì)列么?”

在回答這個(gè)問(wèn)題之前,我們先從本質(zhì)思考:

  • 消息隊(duì)列提供了什么特性?

  • Redis 如何實(shí)現(xiàn)消息隊(duì)列?是否滿足存取需求?

今天,碼哥結(jié)合消息隊(duì)列的特點(diǎn)一步步帶大家分析使用 Redis 的 List 作為消息隊(duì)列的實(shí)現(xiàn)原理,并分享如何把 SpringBoot 與 Redission 整合運(yùn)用到項(xiàng)目中。

什么是消息隊(duì)列

消息隊(duì)列是一種異步的服務(wù)間通信方式,適用于分布式和微服務(wù)架構(gòu)。消息在被處理和刪除之前一直存儲(chǔ)在隊(duì)列上。

每條消息僅可被一位用戶處理一次。消息隊(duì)列可被用于分離重量級(jí)處理、緩沖或批處理工作以及緩解高峰期工作負(fù)載。

消息隊(duì)列
  • Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到 Broker;

  • Broker:消息處理中心。負(fù)責(zé)消息存儲(chǔ)、確認(rèn)、重試等,一般其中會(huì)包含多個(gè) queue;

  • Consumer:消息消費(fèi)者,負(fù)責(zé)從 Broker 中獲取消息,并進(jìn)行相應(yīng)處理;

消息隊(duì)列的使用場(chǎng)景有哪些呢?

消息隊(duì)列在實(shí)際應(yīng)用中包括如下四個(gè)場(chǎng)景:

  • 應(yīng)用耦合:發(fā)送方、接收方系統(tǒng)之間不需要了解雙方,只需要認(rèn)識(shí)消息。多應(yīng)用間通過(guò)消息隊(duì)列對(duì)同一消息進(jìn)行處理,避免調(diào)用接口失敗導(dǎo)致整個(gè)過(guò)程失敗;

  • 異步處理:多應(yīng)用對(duì)消息隊(duì)列中同一消息進(jìn)行處理,應(yīng)用間并發(fā)處理消息,相比串行處理,減少處理時(shí)間;

  • 限流削峰:廣泛應(yīng)用于秒殺或搶購(gòu)活動(dòng)中,避免流量過(guò)大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況;

  • 消息驅(qū)動(dòng)的系統(tǒng):系統(tǒng)分為消息隊(duì)列、消息生產(chǎn)者、消息消費(fèi)者,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,消費(fèi)者(可能有多個(gè))負(fù)責(zé)對(duì)消息進(jìn)行處理;

消息隊(duì)列滿足哪些特性

消息有序性

消息是異步處理的,但是消費(fèi)者需要按照生產(chǎn)者發(fā)送消息的順序來(lái)消費(fèi),避免出現(xiàn)后發(fā)送的消息被先處理的情況。

重復(fù)消息處理

生產(chǎn)者可能因?yàn)榫W(wǎng)絡(luò)問(wèn)題出現(xiàn)消息重傳導(dǎo)致消費(fèi)者可能會(huì)收到多條重復(fù)消息。

同樣的消息重復(fù)多次的話可能會(huì)造成一業(yè)務(wù)邏輯多次執(zhí)行,需要確保如何避免重復(fù)消費(fèi)問(wèn)題。

可靠性

一次保證消息的傳遞。如果發(fā)送消息時(shí)接收者不可用,消息隊(duì)列會(huì)保留消息,直到成功地傳遞它。

當(dāng)消費(fèi)者重啟后,可以繼續(xù)讀取消息進(jìn)行處理,防止消息遺漏。

List 實(shí)現(xiàn)消息隊(duì)列

Redis 的列表(List)是一種線性的有序結(jié)構(gòu),可以按照元素被推入列表中的順序來(lái)存儲(chǔ)元素,能滿足「先進(jìn)先出」的需求,這些元素既可以是文字?jǐn)?shù)據(jù),又可以是二進(jìn)制數(shù)據(jù)。

LPUSH

生產(chǎn)者使用 LPUSH key element[element...] 將消息插入到隊(duì)列的頭部,如果 key 不存在則會(huì)創(chuàng)建一個(gè)空的隊(duì)列再插入消息。

如下,生產(chǎn)者向隊(duì)列 queue 先后插入了 「Java」「碼哥字節(jié)」「Go」,返回值表示消息插入隊(duì)列后的個(gè)數(shù)。

>?LPUSH?queue?Java?碼哥字節(jié)?Go (integer)?3

RPOP

消費(fèi)者使用 RPOP key 依次讀取隊(duì)列的消息,先進(jìn)先出,所以 「Java」會(huì)先讀取消費(fèi):

>?RPOP?queue "Java" >?RPOP?queue "碼哥字節(jié)" >?RPOP?queue "Go"

List隊(duì)列

實(shí)時(shí)消費(fèi)問(wèn)題

65 哥:這么簡(jiǎn)單就實(shí)現(xiàn)了么?

別高興的太早,LPUSH、RPOP 存在一個(gè)性能風(fēng)險(xiǎn),生產(chǎn)者向隊(duì)列插入數(shù)據(jù)的時(shí)候,List 并不會(huì)主動(dòng)通知消費(fèi)者及時(shí)消費(fèi)。

我們需要寫(xiě)一個(gè) while(true) 不停地調(diào)用 RPOP 指令,當(dāng)有新消息就會(huì)返回消息,否則返回空。

程序需要不斷輪詢并判斷是否為空再執(zhí)行消費(fèi)邏輯,這就會(huì)導(dǎo)致即使沒(méi)有新消息寫(xiě)入到隊(duì)列,消費(fèi)者也要不停地調(diào)用 RPOP 命令占用 CPU 資源。

65 哥:要如何避免循環(huán)調(diào)用導(dǎo)致的 CPU 性能損耗呢?

Redis 提供了 BLPOP、BRPOP 阻塞讀取的命令,消費(fèi)者在在讀取隊(duì)列沒(méi)有數(shù)據(jù)的時(shí)候自動(dòng)阻塞,直到有新的消息寫(xiě)入隊(duì)列,才會(huì)繼續(xù)讀取新消息執(zhí)行業(yè)務(wù)邏輯。

BRPOP?queue?0

參數(shù) 0 表示阻塞等待時(shí)間無(wú)無(wú)限制

重復(fù)消費(fèi)

  • 消息隊(duì)列為每一條消息生成一個(gè)「全局 ID」;

  • 生產(chǎn)者為每一條消息創(chuàng)建一條「全局 ID」,消費(fèi)者把一件處理過(guò)的消息 ID 記錄下來(lái)判斷是否重復(fù)。

其實(shí)這就是冪等,對(duì)于同一條消息,消費(fèi)者收到后處理一次的結(jié)果和多次的結(jié)果是一致的。

消息可靠性

65 哥:消費(fèi)者從 List 中讀取一條在消息處理過(guò)程中宕機(jī)了就會(huì)導(dǎo)致消息沒(méi)有處理完成,可是數(shù)據(jù)已經(jīng)沒(méi)有保存在 List 中了咋辦?

本質(zhì)就是消費(fèi)者在處理消息的時(shí)候崩潰了,就無(wú)法再還原消息,缺乏一個(gè)消息確認(rèn)機(jī)制。

Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)兩個(gè)指令,含義是從 List 從讀取消息的同時(shí)把這條消息復(fù)制到另一個(gè) List 中(備份),并且是原子操作。

我們就可以在業(yè)務(wù)流程正確處理完成后再刪除隊(duì)列消息實(shí)現(xiàn)消息確認(rèn)機(jī)制。如果在處理消息的時(shí)候宕機(jī)了,重啟后再?gòu)膫浞?List 中讀取消息處理。

LPUSH?redisMQ?公眾號(hào)?碼哥字節(jié) BRPOPLPUSH?redisMQ?redisMQBack

生產(chǎn)者用 LPUSH 把消息插入到 redisMQ 隊(duì)列中,消費(fèi)者使用 BRPOPLPUSH 讀取消息「公眾號(hào)」,同時(shí)該消息會(huì)被插入到 「redisMQBack」隊(duì)列中。

如果消費(fèi)成功則把「redisMQBack」的消息刪除即可,異常的話可以繼續(xù)從 「redisMQBack」再次讀取消息處理。

redis消息確認(rèn)機(jī)制

需要注意的是,如果生產(chǎn)者消息發(fā)送的很快,而消費(fèi)者處理速度慢就會(huì)導(dǎo)致消息堆積,給 Redis 的內(nèi)存帶來(lái)過(guò)大壓力。

Redission 實(shí)戰(zhàn)

在 Java 中,我們可以利用 Redission 封裝的 API 來(lái)快速實(shí)現(xiàn)隊(duì)列,接下來(lái)碼哥基于 SpringBoot 2.1.4 版本來(lái)交大家如何整合并實(shí)戰(zhàn)。

詳細(xì) API 文檔大家可查閱:

https://github.com/redisson/redisson/wiki/7.-Distributed-collections

添加依賴

<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.16.7</version> </dependency>

添加 Redis 配置,碼哥的 Redis 沒(méi)有配置密碼,大家根據(jù)實(shí)際情況配置即可。

spring:application:name:?redissionredis:host:?127.0.0.1port:?6379ssl:?false

Java 代碼實(shí)戰(zhàn)

RBlockingDeque 繼承 java.util.concurrent.BlockingDeque ,在使用過(guò)程中我們完全可以根據(jù)接口文檔來(lái)選擇合適的 API 去實(shí)現(xiàn)業(yè)務(wù)邏輯。

主要方法如下:

碼哥采用了雙端隊(duì)列來(lái)舉例

@Slf4j @Service public?class?QueueService?{@Autowiredprivate?RedissonClient?redissonClient;private?static?final?String?REDIS_MQ?=?"redisMQ";/***?發(fā)送消息到隊(duì)列頭部**?@param?message*/public?void?sendMessage(String?message)?{RBlockingDeque<String>?blockingDeque?=?redissonClient.getBlockingDeque(REDIS_MQ);try?{blockingDeque.putFirst(message);log.info("將消息:?{}?插入到隊(duì)列。",?message);}?catch?(InterruptedException?e)?{e.printStackTrace();}}/***?從隊(duì)列尾部阻塞讀取消息,若沒(méi)有消息,線程就會(huì)阻塞等待新消息插入,防止?CPU?空轉(zhuǎn)*/public?void?onMessage()?{RBlockingDeque<String>?blockingDeque?=?redissonClient.getBlockingDeque(REDIS_MQ);while?(true)?{try?{String?message?=?blockingDeque.takeLast();log.info("從隊(duì)列?{}?中讀取到消息:{}.",?REDIS_MQ,?message);}?catch?(InterruptedException?e)?{e.printStackTrace();}}}

單元測(cè)試

@RunWith(SpringRunner.class) @SpringBootTest(classes?=?RedissionApplication.class) public?class?RedissionApplicationTests?{@Autowiredprivate?QueueService?queueService;@Testpublic?void?testQueue()?throws?InterruptedException?{new?Thread(()?->?{for?(int?i?=?0;?i?<?1000;?i++)?{queueService.sendMessage("消息"?+?i);}}).start();new?Thread(()?->?queueService.onMessage()).start();Thread.currentThread().join();}}

總結(jié)

可以使用 List 數(shù)據(jù)結(jié)構(gòu)來(lái)實(shí)現(xiàn)消息隊(duì)列,滿足先進(jìn)先出。為了實(shí)現(xiàn)消息可靠性,Redis 提供了 BRPOPLPUSH 命令是解決。

Redis 是一個(gè)非常輕量級(jí)的鍵值數(shù)據(jù)庫(kù),部署一個(gè) Redis 實(shí)例就是啟動(dòng)一個(gè)進(jìn)程,部署 Redis 集群,也就是部署多個(gè) Redis 實(shí)例。

而 Kafka、RabbitMQ 部署時(shí),涉及額外的組件,例如 Kafka 的運(yùn)行就需要再部署 ZooKeeper。相比 Redis 來(lái)說(shuō),Kafka 和 RabbitMQ 一般被認(rèn)為是重量級(jí)的消息隊(duì)列。

需要注意的是,我們要避免生產(chǎn)者過(guò)快,消費(fèi)者過(guò)慢導(dǎo)致的消息堆積占用 Redis 的內(nèi)存。

在消息量不大的情況下使用 Redis 作為消息隊(duì)列,他能給我們帶來(lái)高性能的消息讀寫(xiě),這似乎也是一個(gè)很好消息隊(duì)列解決方案。

大家覺(jué)得是否合適作為消息隊(duì)列呢?

總結(jié)

以上是生活随笔為你收集整理的Redis 使用 List 实现消息队列的利与弊的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。