日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RabbitMQ实现延迟消息

發布時間:2023/12/20 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RabbitMQ实现延迟消息 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

本文主要講解mall整合RabbitMQ實現延遲消息的過程,以發送延遲消息取消超時訂單為例。

文章目錄

    • 項目使用框架介紹
      • RabbitMQ
        • RabbitMQ的安裝和使用
      • Lombok
    • 業務場景說明
    • 整合RabbitMQ實現延遲消息
      • 在pom.xml中添加相關依賴
      • 修改SpringBoot配置文件
      • 添加消息隊列的枚舉配置類QueueEnum
      • 添加RabbitMQ的配置
        • 在RabbitMQ管理頁面可以看到以下交換機和隊列
        • 交換機及隊列說明
      • 添加延遲消息的發送者CancelOrderSender
      • 添加取消訂單消息的接收者CancelOrderReceiver
      • 添加OmsPortalOrderService接口
      • 添加OmsPortalOrderService的實現類OmsPortalOrderServiceImpl
      • 添加OmsPortalOrderController定義接口
    • 進行接口測試
      • 調用下單接口

項目使用框架介紹

RabbitMQ

RabbitMQ是一個被廣泛使用的開源消息隊列。它是輕量級且易于部署的,它能支持多種消息協議。RabbitMQ可以部署在分布式和聯合配置中,以滿足高規模、高可用性的需求。

RabbitMQ的安裝和使用

注意:Erlang版本與RabbitMQ版本有對應關系,版本不對應會導致RabbitMQ安裝不了

  • 安裝Erlang,下載地址:http://erlang.org/download/otp_win64_23.3.exe
  • Erlang需要配置環境變量,配置bin目錄所在路徑
  • 安裝RabbitMQ,下載地址:https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.16/rabbitmq-server-3.8.16.exe
  • 安裝完成后,進入RabbitMQ安裝目錄下的sbin目錄
  • 在地址欄輸入cmd并回車啟動命令行,然后輸入以下命令啟動管理功能:
  • rabbitmq-plugins enable rabbitmq_management

    如果出現:

    解決方法:
    將 C:\Users\Administrator.erlang.cookie 同步至C:\Windows\System32\config\systemprofile.erlang.cookie
    同時刪除:C:\Users\Administrator\AppData\Roaming\RabbitMQ目錄
    輸入命令:rabbitmq-plugins.bat enable rabbitmq_management ,出現下面信息表示插件安裝成功:

    輸入命令:rabbitmq-server.bat

  • 訪問地址查看是否啟動成功:http://localhost:15672/
  • 輸入賬號密碼并登錄:guest guest
  • 創建帳號并設置其角色為管理員:mall mall
  • 創建一個新的虛擬host為:/mall
  • 點擊mall用戶進入用戶配置頁面
  • 給mall用戶配置該虛擬host的權限
  • 至此,RabbitMQ的安裝和配置完成。

  • Lombok

    Lombok為Java語言添加了非常有趣的附加功能,你可以不用再為實體類手寫getter,setter等方法,通過一個注解即可擁有。

    注意:需要安裝idea的Lombok插件,并在項目中的pom文件中添加依賴。

    業務場景說明

    用于解決用戶下單以后,訂單超時如何取消訂單的問題。

    • 用戶進行下單操作(會有鎖定商品庫存、使用優惠券、積分一系列的操作);
    • 生成訂單,獲取訂單的id;
    • 獲取到設置的訂單超時時間(假設設置的為60分鐘不支付取消訂單);
    • 按訂單超時時間發送一個延遲消息給RabbitMQ,讓它在訂單超時后觸發取消訂單的操作;
    • 如果用戶沒有支付,進行取消訂單操作(釋放鎖定商品庫存、返還優惠券、返回積分一系列操作)。

    整合RabbitMQ實現延遲消息

    在pom.xml中添加相關依賴

    <!--消息隊列相關依賴--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--lombok依賴--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency>

    修改SpringBoot配置文件

    修改application.yml文件,在spring節點下添加RabbitMQ相關配置。

    rabbitmq:host: localhost # rabbitmq的連接地址port: 5672 # rabbitmq的連接端口號virtual-host: /mall # rabbitmq的虛擬hostusername: mall # rabbitmq的用戶名password: mall # rabbitmq的密碼publisher-confirms: true #如果對異步消息需要回調必須設置為true

    添加消息隊列的枚舉配置類QueueEnum

    用于延遲消息隊列及處理取消訂單消息隊列的常量定義,包括交換機名稱、隊列名稱、路由鍵名稱。

    package com.macro.mall.tiny.dto;import lombok.Getter;/*** 消息隊列枚舉配置* Created by macro on 2018/9/14.*/@Getterpublic enum QueueEnum {/*** 消息通知隊列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl隊列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交換名稱*/private String exchange;/*** 隊列名稱*/private String name;/*** 路由鍵*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;}}

    添加RabbitMQ的配置

    用于配置交換機、隊列及隊列與交換機的綁定關系。

    package com.macro.mall.tiny.config;import com.macro.mall.tiny.dto.QueueEnum;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/*** 消息隊列配置* Created by macro on 2018/9/14.*/@Configurationpublic class RabbitMqConfig {/*** 訂單消息實際消費隊列所綁定的交換機*/@BeanDirectExchange orderDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單延遲隊列隊列所綁定的交換機*/@BeanDirectExchange orderTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單實際消費隊列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 訂單延遲隊列(死信隊列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后轉發的交換機.withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后轉發的路由鍵.build();}/*** 將訂單隊列綁定到交換機*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 將訂單延遲隊列綁定到交換機*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}}

    在RabbitMQ管理頁面可以看到以下交換機和隊列




    交換機及隊列說明

    • mall.order.direct(取消訂單消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel,一旦有消息以mall.order.cancel為路由鍵發過來,會發送到此隊列。
    • mall.order.direct.ttl(訂單延遲消息隊列所綁定的交換機):綁定的隊列為mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl為路由鍵發送過來,會轉發到此隊列,并在此隊列保存一定時間,等到超時后會自動將消息發送到mall.order.cancel(取消訂單消息消費隊列)。

    添加延遲消息的發送者CancelOrderSender

    用于向訂單延遲消息隊列(mall.order.cancel.ttl)里發送消息。

    package com.macro.mall.tiny.component;import com.macro.mall.tiny.dto.QueueEnum;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** 取消訂單消息的發出者* Created by macro on 2018/9/14.*/@Componentpublic class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//給延遲隊列發送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//給消息設置延遲毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);}}

    添加取消訂單消息的接收者CancelOrderReceiver

    用于從取消訂單的消息隊列(mall.order.cancel)里接收消息。

    package com.macro.mall.tiny.component;import com.macro.mall.tiny.service.OmsPortalOrderService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/*** 取消訂單消息的處理者* Created by macro on 2018/9/14.*/@Component@RabbitListener(queues = "mall.order.cancel")public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);@Autowiredprivate OmsPortalOrderService portalOrderService;@RabbitHandlerpublic void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);}}

    添加OmsPortalOrderService接口

    package com.macro.mall.tiny.service;import com.macro.mall.tiny.common.api.CommonResult;import com.macro.mall.tiny.dto.OrderParam;import org.springframework.transaction.annotation.Transactional;/*** 前臺訂單管理Service* Created by macro on 2018/8/30.*/public interface OmsPortalOrderService {/*** 根據提交信息生成訂單*/@TransactionalCommonResult generateOrder(OrderParam orderParam);/*** 取消單個超時訂單*/@Transactionalvoid cancelOrder(Long orderId);}

    添加OmsPortalOrderService的實現類OmsPortalOrderServiceImpl

    package com.macro.mall.tiny.service.impl;import com.macro.mall.tiny.common.api.CommonResult;import com.macro.mall.tiny.component.CancelOrderSender;import com.macro.mall.tiny.dto.OrderParam;import com.macro.mall.tiny.service.OmsPortalOrderService;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;/*** 前臺訂單管理Service* Created by macro on 2018/8/30.*/@Servicepublic class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);@Autowiredprivate CancelOrderSender cancelOrderSender;@Overridepublic CommonResult generateOrder(OrderParam orderParam) {//todo 執行一系類下單操作,具體參考mall項目LOGGER.info("process generateOrder");//下單完成后開啟一個延遲消息,用于當用戶沒有付款時取消訂單(orderId應該在下單后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下單成功");}@Overridepublic void cancelOrder(Long orderId) {//todo 執行一系類取消訂單操作,具體參考mall項目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//獲取訂單超時時間,假設為60分鐘long delayTimes = 30 * 1000;//發送延遲消息cancelOrderSender.sendMessage(orderId, delayTimes);}}

    添加OmsPortalOrderController定義接口

    package com.macro.mall.tiny.controller;import com.macro.mall.tiny.dto.OrderParam;import com.macro.mall.tiny.service.OmsPortalOrderService;import io.swagger.annotations.Api;import io.swagger.annotations.ApiOperation;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestMethod;import org.springframework.web.bind.annotation.ResponseBody;/*** 訂單管理Controller* Created by macro on 2018/8/30.*/@Controller@Api(tags = "OmsPortalOrderController", description = "訂單管理")@RequestMapping("/order")public class OmsPortalOrderController {@Autowiredprivate OmsPortalOrderService portalOrderService;@ApiOperation("根據購物車信息生成訂單")@RequestMapping(value = "/generateOrder", method = RequestMethod.POST)@ResponseBodypublic Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);}}

    進行接口測試

    調用下單接口

    注意:已經將延遲消息時間設置為30秒

    總結

    以上是生活随笔為你收集整理的RabbitMQ实现延迟消息的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。