日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

使用Redis 实现消息队列

發布時間:2024/4/30 数据库 68 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Redis 实现消息队列 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一 、為什么要用Redis實現輕量級MQ?

  • MQ的主要作用:
  • 應用解耦
  • 異步化消息
  • 流量削峰填谷
  • 目前使用比較多的是ActiveMQ 、 RabbitMQ 、 ZeroMQ 、 Kafka 、 MetaMQ 、 RocketMQ等
  • 在業務實現過程中 , 就算沒有大量的流量 , 解耦和異步化也是處處可用 , 此時MQ就顯得尤為重要 。 但與此同時MQ也是一個蠻重的組件,例如我們如果用RabbitMQ就必須為它搭建一個服務器,同時如果要考慮可用性,就要為服務端建立一個集群,而且在生產如果有問題也需要查找功能。在中小型業務的開發過程中,可能業務的其他整個實現都沒這個重。過重的組件服務會成倍增加工作量。所幸的是,Redis提供的list數據結構非常適合做消息隊列。但是如何實現即時消費?如何實現ack機制?這些是實現的關鍵所在。
  • 二、 如何實現即使消費

  • 網上所流傳的方法是使用Redis中list的操作BLPOP或BRPOP,即列表的阻塞式(blocking)彈出。讓我們來看看阻塞式彈出的使用方式:

    BRPOP key [key ...] timeout 此命令的說明是:1、當給定列表內沒有任何元素可供彈出的時候,連接將被 BRPOP 命令阻塞,直到等待超時或發現可彈出元素為止。 2、當給定多個key參數時,按參數 key 的先后順序依次檢查各個列表,彈出第一個非空列表的尾部元素。 另外,BRPOP 除了彈出元素的位置和 BLPOP 不同之外,其他表現一致。
  • 以此看來 , 列表的阻塞式彈出有兩個特點:
  • 如果list中沒有任務時, 該連接將會被阻塞
  • 連接的阻塞有一個超時時間 , 當超時時間設置為0時 , 即可無線等待, 直到彈出消息
  • 由此看來此方式是可行的的 , 但是此方式為傳統的觀察者模式 , 業務簡單可用 , 如果A的任務由B去執行 沒有問題 , 但是如果A 、 B 發布的任務要C 、 D 去都能執行 , 這個方法就相形見絀了 , 這時就要用到發布\訂閱模式 , 使業務系統更加清晰 。
  • 好在Redis也支持Pub/Sub(發布 / 訂閱) 。 在消息A入隊的同時發布通知到頻道Channel , 此時已經訂閱channel的worker就收到了通知 , 知道了list中有消息A ,就可以獲取并消費了 。
  • 三、 及時消費實例

    • 示例場景為:worker要做同步文件功能,等到有文件生成時立馬同步。

      首先開啟一個線程代表worker,來訂閱頻道channel:@Service public class SubscribeService {@Resourceprivate RedisService redisService;@Resourceprivate SynListener synListener;//訂閱者@PostConstructpublic void subscribe() {new Thread(new Runnable() {@Overridepublic void run() {LogCvt.info("服務已訂閱頻道:{}", channel);redisService.subscribe(synListener, channel);}}).start();} } 代碼中的SynListener即為所聲明的訂閱者,channel為訂閱的頻道名稱,具體的訂閱邏輯如下:@Service public class SynListener extends JedisPubSub {@Resourceprivate DispatchMessageHandler dispatchMessageHandler;@Overridepublic void onMessage(String channel, String message) {LogCvt.info("channel:{},receives message:{}",channel,message);try {//處理業務(同步文件)dispatchMessageHandler.synFile();} catch (Exception e) {LogCvt.error(e.getMessage(),e);}} } 處理業務的時候,就去list中去消費消息:@Service public class DispatchMessageHandler {@Resourceprivate RedisService redisService;@Resourceprivate MessageHandler messageHandler;public void synFile(){while(true){try {String message = redisService.lpop(RedisKeyUtil.syn_file_queue_key());if (null == message){break;}Thread.currentThread().setName(Tools.uuid());// 隊列數據處理messageHandler.synfile(message);} catch (Exception e) {LogCvt.error(e.getMessage(),e);}}}

    四、 如何實現ack機制?

  • ack , 即小氣確認機制
  • 首先看看rabbitMQ 的ack機制:
  • publistener把消息通知給Consumer , 如果在consumer已處理完任務 , 那么他將向Broker發送ack消息 , 告知某條消息已經被成功處理 , 可以從隊列中移除 。 如果consunmer么有發送回ack消息 , 那么Broker會認為消息處理失敗 , 會將此消息及后續消息分發給其他consumer進行處理(redeliver flag 設置為true )
  • 這種機制和TCP/IP協議確認連接類似 , 不同的是TCP/IP確立連接需要經過三次握手,而RabbitMQ只需要一次ACK。
  • 值得注意的是RabbitMQ當且僅當檢測到ACK消息未發出且Consumer的連接終止時才會將消息重新分發給其他Consumer,因此不需要擔心消息處理時間過長而被重新分發的情況。
  • 那么在我們用Redis實現消息隊列的ack機制的時候該怎么做呢?
  • work處理失敗之后 , 要回滾消息到原始pending隊列
  • 加入worker掛掉 , 也要回滾消息到原始pending隊列
  • 五 、 實現方案(主要解決worker掛掉的情況)

  • 維護兩個隊列: pending隊列和doing表(hash表)。
  • workers定義為ThreadPool。
  • 由pending隊列出隊后,workers分配一個線程(單個worker)去處理消息——給目標消息append一個當前時間戳和當前線程名稱,將其寫入doing表,然后該worker去消費消息,完成后自行在doing表擦除信息。
  • 啟用一個定時任務,每隔一段時間去掃描doing隊列,檢查每隔元素的時間戳,如果超時,則由worker的ThreadPoolExecutor去檢查線程是否存在,如果存在則取消當前任務執行,并把事務rollback。最后把該任務從doing隊列中pop出,再重新push進pending隊列。
  • 在worker的某線程中,如果處理業務失敗,則主動回滾,并把任務從doing隊列中移除,重新push進pending隊列。
  • 六 、 總結

  • Redis作為消息隊列是有很大局限性的。因為其主要特性及用途決定它只能實現輕量級的消息隊列。寫在最后:沒有絕對好的技術,只有對業務最友好的技術,謹此獻給所有developer。
  • 總結

    以上是生活随笔為你收集整理的使用Redis 实现消息队列的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。