使用Redis 实现消息队列
生活随笔
收集整理的這篇文章主要介紹了
使用Redis 实现消息队列
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一 、為什么要用Redis實現輕量級MQ?
二、 如何實現即使消費
網上所流傳的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)彈出。讓我們來看看阻塞式彈出的使用方式:
BRPOP key [key ...] timeout 此命令的說明是:1、當給定列表內沒有任何元素可供彈出的時候,連接將被 BRPOP 命令阻塞,直到等待超時或發現可彈出元素為止。 2、當給定多個key參數時,按參數 key 的先后順序依次檢查各個列表,彈出第一個非空列表的尾部元素。 另外,BRPOP 除了彈出元素的位置和 BLPOP 不同之外,其他表現一致。三、 及時消費實例
示例場景為:worker要做同步文件功能,等到有文件生成時立馬同步。
首先開啟一個線程代表worker,來訂閱頻道channel:@Service public class SubscribeService {@Resourceprivate RedisService redisService;@Resourceprivate SynListener synListener;//訂閱者@PostConstructpublic void subscribe() {new Thread(new Runnable() {@Overridepublic void run() {LogCvt.info("服務已訂閱頻道:{}", channel);redisService.subscribe(synListener, channel);}}).start();} } 代碼中的SynListener即為所聲明的訂閱者,channel為訂閱的頻道名稱,具體的訂閱邏輯如下:@Service public class SynListener extends JedisPubSub {@Resourceprivate DispatchMessageHandler dispatchMessageHandler;@Overridepublic void onMessage(String channel, String message) {LogCvt.info("channel:{},receives message:{}",channel,message);try {//處理業務(同步文件)dispatchMessageHandler.synFile();} catch (Exception e) {LogCvt.error(e.getMessage(),e);}} } 處理業務的時候,就去list中去消費消息:@Service public class DispatchMessageHandler {@Resourceprivate RedisService redisService;@Resourceprivate MessageHandler messageHandler;public void synFile(){while(true){try {String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());if (null == message){break;}Thread.currentThread().setName(Tools.uuid());// 隊列數據處理messageHandler.synfile(message);} catch (Exception e) {LogCvt.error(e.getMessage(),e);}}}
四、 如何實現ack機制?
五 、 實現方案(主要解決worker掛掉的情況)
六 、 總結
總結
以上是生活随笔為你收集整理的使用Redis 实现消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Linux 中su 与su - 的区别
- 下一篇: django.core.exceptio