日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

聊聊Redis消息队列-实现异步秒杀

發布時間:2023/12/29 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 聊聊Redis消息队列-实现异步秒杀 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、前言

消息隊列(Message Queue), 字面意思就是存放消息的隊列,最簡單的消息隊列模型包括3個角色:

  • 消息隊列:存儲和管理消息,也被稱為消息代理(Message Broker);
  • 生產者:發送消息到消息隊列;
  • 消費者:從消息隊列獲取消息并處理消息。

    Redis提供了三種不同的方式來實現消息隊列:
  • list結構:基于List結構模擬消息隊列;
  • PubSub: 基本的點對點消息模型;
  • Stream: 比較完善的消息隊列模型

二、基于List結構模擬消息隊列

消息隊列(Message Queue),字面意思就是存放消息的隊列。而Redis的list數據結構是一個雙向鏈表,很容易模擬出隊列效果。
隊列是入口和出口不在一邊,因此我們可以利用:LPUSH結合RPOP、或者RPUSH結合LPOP來實現;
不過要注意的是,當隊列中沒有消息時RPOP或LPOP操作會返回null,并不像JVM的阻塞隊列那樣會阻塞并等待消息。因此這里應該適用BRPOP或者BLPOP來實現阻塞效果。

2.1 基于List的消息隊列有哪些優缺點?

  • 優點:
    • 利用Redis存儲,不受限于JVM內存上限;
    • 基于Redis的持久化機制,數據安全性有保證;
    • 可以滿足消息有序性;
  • 缺點:
    • 無法避免消息丟失;
    • 只支持消費者

三、基于PubSub的消息隊列

PubSub(發布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費者可以訂閱一個或多個channel,生產者向對應channel發送消息后,所有訂閱者都能收到相關消息。

  • SUBSCCRIBE channel [channel]: 訂閱一個或多個頻道;
  • PUBLISH channel msg: 向一個頻道發送消息;
  • PSUBSCRIBE pattern[pattern]: 訂閱與pattern格式匹配的所有頻道

3.1 基于PubSub的消息隊列有哪些優缺點?

  • 優點:
    • 采用發布訂閱模型,支持多生產、多消息;
  • 缺點:
    • 不支持數據持久化;
    • 無法避免消息丟失;
    • 消息堆積有上限,超出時數據丟失

四、基于Stream的消息隊列

Stream 是 Redis 5.0 引入的一種新數據類型,可以實現一個功能非常完善的消息隊列;

  • 發送消息的命令:
  • 讀取消息的方式之一:XREAD
    • 例如:適用XREAD讀取第一個消息:
  • XREAD阻塞方式,讀取最新的消息:

    在業務開發中,我們可以循環的調用XREAD阻塞方式來查詢最新消息,從而實現持續監聽隊列的效果,偽代碼如下:

4.1 STREAM類型消息隊列的XREAD命令特點:

  • 消息可回溯;
  • 一個消息可以被多個消費者讀取;
  • 可以阻塞讀取;
  • 有消息漏讀的風險

五、基于Stream的消息隊列-消費者組

消費者組(Consumer Group):將多個消費者劃分到一個組中,監聽同一個隊列。具備下列特點:

  • 消息分流:隊列中的消息會分流給組內的不同消費者,而不是重復的消費,從而加快消息處理的速度;
  • 消息標示:消費者組會維護一個標識,記錄最后一個被處理的消息,哪怕消費者宕機重啟,還會從標識之后讀取消息。確保每一個消息都會被消費;
  • 消息確認:消費者獲取消費后,消息處于pending狀態,并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。
  • 通俗的講,就是多個消費者在一個隊列中處于競爭關系,多個消費者來處理隊列消息,加快消息處理的速度。而且消費者組會給消息加上一個標識,記錄最新讀到的消息。如果中途消息處理完未提交,消息還會進入pending狀態。進入pending-list中,不會造成數據的丟失。

    5.1 STREAM類型消息隊列的XREADGROUP命令特點:

    • 消息可回溯;
    • 可以多消費者爭搶消息,加快消費速度;
    • 可以阻塞讀取;
    • 沒有消息漏讀的風險;
    • 有消息確認機制,保證消息至少被消費一次

    總結

    六、案例

    基于Redis的Stream結構作為消息隊列,實現異步秒殺下單

    需求:
    ① 創建一個Stream類型的消息隊列,名為stream.orders;
    ② 修改之前的秒殺下單Lua腳本,在認定有搶購資格后,直接向stream.orders中添加消息,內容包含voucherId、userId、orderId;
    ③ 項目啟動時,開啟一個線程任務,嘗試獲取stream.orders中的消息,完成下單。

    • lua腳本
    local voucherId = ARGV[1] --1.2用戶id local userId = ARGV[2] --1.3訂單id local orderId = ARGV[3] --1.4--2.數據key --2.1庫存key local stockKey = "seckill:stock:" .. voucherId --2.2訂單key local orderKey = "seckill:order:" .. userId--3.腳本業務 local stock = redis.call('get', stockKey) local stockNumber = tonumber(stock) --3.1判斷庫存是否充足 get stock if (stockNumber <= 0) then--庫存不足返回1return 1 end --3.2判斷用戶是否下單 if (redis.call('sismember', orderKey, userId) == 1) then--存在,返回2return 2 end --3.3扣庫存incrby stockKey -1 redis.call('incrby', stockKey, -1) --3.4下單,保存用戶信息 sadd orderKey userId redis.call('sadd', orderKey, userId) --3.5 發送消息到redis stream隊列,xadd stream.orders * k1 v1 k2 v2...... redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId) return 0
    • 主要業務代碼
    private IVoucherOrderService proxy;private static final DefaultRedisScript<Long> SECKILL_SCRIPT;static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));SECKILL_SCRIPT.setResultType(Long.class);}private final BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();@PostConstructprivate void init() {SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());}private class VoucherOrderHandler implements Runnable {String queueName = "stream.orders";@Overridepublic void run() {while (true) {try {// 1.獲取消息隊列中的訂單信息 XREADGROUP GROUP q1 c1 COUNT 1 BLOCK 2000 STREAMS streams.orderList<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create(queueName, ReadOffset.lastConsumed()));// 2.判斷消息獲取是否成功if (CollectionUtils.isEmpty(list)) {// 如果獲取失敗,說明沒有消息,繼續下一次循環continue;}// 3. 解析消息中的訂單信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 3.如果獲取成功,可以下單handleVoucherOrder(voucherOrder);// 4.ACK確認 SACK streams.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);handlePendingList();}}}private void handlePendingList() {while (true) {try {// 1.獲取pending-list中的訂單信息 XREADGROUP GROUP q1 c1 COUNT 1 STREAMS streams.order 0List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1),StreamOffset.create(queueName, ReadOffset.from("0")));// 2.判斷消息獲取是否成功if (CollectionUtils.isEmpty(list)) {// 如果獲取失敗,說明pending-list沒有異常消息,結束循環break;}// 3. 解析消息中的訂單信息MapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> values = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);// 3.如果獲取成功,可以下單handleVoucherOrder(voucherOrder);// 4.ACK確認 SACK streams.orders g1 idstringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());} catch (Exception e) {log.error("處理訂單異常", e);try {TimeUnit.MILLISECONDS.sleep(20);} catch (InterruptedException ex) {ex.printStackTrace();}}}}}private void handleVoucherOrder(VoucherOrder voucherOrder) {//1.1獲取用戶idLong userId = voucherOrder.getUserId();//自己定義的SimpleRedisLock鎖類//SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);//使用的redisson獲取的鎖類//1.2創建鎖對象RLock lock = redissonClient.getLock("lock:order:" + userId);//1.3獲取鎖boolean isLock = lock.tryLock();//1.4判斷獲取鎖是否成功if (!isLock) {//獲取鎖失敗,返回錯誤log.error("不允許重復下單");return;}try {// 獲取代理對象(事務)proxy.createVoucherOrder(voucherOrder);} finally {//3.釋放鎖lock.unlock();}}@Overridepublic Result seckillVoucher(Long voucherId) {// 獲取用戶Long userId = UserHolder.getUser().getId();// 獲取訂單idLong orderId = redisIdWorker.nextId("orderId");// 1.執行lua腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),voucherId.toString(),userId.toString(),String.valueOf(orderId));// 2. 判斷結果是否為0int r = Objects.requireNonNull(result).intValue();if (r != 0) {// 2.1 不為0,代表沒有購買資格return Result.fail(r == 1 ? "庫存不足" : "不能重復下單");}// 2.2 為0,有購買資格,把下單信息保存到阻塞隊列// TODO 保存阻塞隊列VoucherOrder voucherOrder = VoucherOrder.builder().id(orderId).userId(userId).voucherId(voucherId).build();// 放入阻塞隊列orderTasks.add(voucherOrder);// 3. 獲取代理對象proxy = (IVoucherOrderService) AopContext.currentProxy();// 4. 返回訂單idreturn Result.ok(orderId);}

    總結

    以上是生活随笔為你收集整理的聊聊Redis消息队列-实现异步秒杀的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。