日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > windows >内容正文

windows

RocketMQ事务消息在订单创建和库存扣减的使用

發布時間:2024/1/16 windows 40 coder
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ事务消息在订单创建和库存扣减的使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

下單的過程包括訂單創建,還有庫存的扣減,為提高系統的性能,將庫存放在redis扣減,則會涉及到Mysql和redis之間的數據同步,其中,這個過程還涉及到,必須是訂單創建成功才進行庫存的扣減操作。其次,還涉及到庫存的同步,需要保證訂單創建成功和redis里的庫存都扣減成功,再將庫存數據同步到Mysql,為了實現上述這里情況,可以借助RocketMQ的事務型消息來實現。

流程圖

流程圖如下,這里引入了stocklog,即訂單流水表,通過判斷stocklog的狀態來決定是否commite消息去同步mysql,這里stocklog狀態為成功的前提是訂單入庫和redis庫存扣減成功。

對于RocketMQ的事務消息的進一步解釋:

在第五步執行成功返回可能因為網絡狀況卡住,但是stocklog狀態已經得到修改

如果返回成功 MQ事務就會commite這條消息

如果沒有返回成功 MQ事務會去輪詢stocklog有沒有被修改

一直輪詢發現沒有被修改就會回滾這條消息

這條消息Commit后,就會被MQ的消費者消費,對MySQL的實際庫存進行更新

需要的SQL表

這里簡化一下下單的流程,不涉及用戶表,只涉及到庫存表,庫存流水表,訂單表。

order表

CREATE TABLE `order` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '訂單id',
  `product_id` int(11) DEFAULT NULL COMMENT '產品id',
  `product_num` int(11) DEFAULT NULL COMMENT '產品數量',
  PRIMARY KEY (`id`),
  KEY `product_id_index` (`product_id`) USING BTREE COMMENT '產品id索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

stock表

CREATE TABLE `stock` (
  `id` int(10) NOT NULL AUTO_INCREMENT COMMENT '庫存id',
  `product_id` int(11) DEFAULT NULL COMMENT '產品id',
  `product_name` varchar(255) DEFAULT NULL COMMENT '產品名字',
  `stock_num` int(11) DEFAULT NULL COMMENT '產品庫存',
  PRIMARY KEY (`id`),
  UNIQUE KEY `product_id_index` (`product_id`) USING BTREE COMMENT '產品Id唯一索引'
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4;

stock_log表

CREATE TABLE `stock_log` (
  `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '庫存id',
  `product_id` int(11) DEFAULT NULL COMMENT '產品id',
  `amount` int(11) DEFAULT NULL COMMENT '庫存變化數量',
  `status` int(11) DEFAULT NULL COMMENT '狀態0->初始化,1->成功,2->回滾',
  PRIMARY KEY (`id`),
  KEY `product_id_index` (`product_id`) USING BTREE COMMENT '產品id索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

關鍵代碼

OrderController類

@Controller
@RequestMapping("/order")
@RequiredArgsConstructor
@Slf4j
public class OrderController {

    private final OrderService orderService;

    private final StockLogService stockLogService;

    private final DecreaseStockProducer decreaseStockProducer;

    private final StockService stockService;

    private final RedisTemplate redisTemplate;

    @PostMapping(value = "/create/{id}")
    public ResponseEntity<Object> create(@PathVariable("id") Integer productId) {
        // 檢查redis是否有庫存0的標識
        if (redisTemplate.hasKey("product_stock_invalid_" + productId)) {
            return new ResponseEntity<>("庫存不足", HttpStatus.OK);
        }

        // 先創建庫存流水 這里默認一次只能扣減數量1的庫存
        StockLog stockLog = StockLog.builder()
                .amount(1)
                .productId(productId)
                .status(0)
                .build();
        stockLogService.save(stockLog);

        // 發送事務消息
        try {
            DecreaseStockEvent decreaseStockEvent = DecreaseStockEvent.builder()
                    .productId(productId)
                    .stockLogId(stockLog.getId())
                    .build();
            SendResult sendResult = decreaseStockProducer.sendMessageInTransaction(decreaseStockEvent);
            if (!Objects.equals(sendResult.getSendStatus(), SendStatus.SEND_OK)) {
                log.error("事務消息發送錯誤,請求參數productId:{}", productId);
            }
        } catch (Exception e) {
            log.error("消息發送錯誤,請求參數:{}", productId, e);
        }

        return new ResponseEntity<>("created successfully", HttpStatus.OK);
    }

StockStatusCheckerListener類,執行本地事務和檢查事務

@Slf4j
@RocketMQTransactionListener
@RequiredArgsConstructor
public class StockStatusCheckerListener implements RocketMQLocalTransactionListener {

    private final OrderService orderService;

    private final StockLogService stockLogService;

    private final TransactionTemplate transactionTemplate;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
        log.info("message: {}, args: {}", message, arg);
        TypeReference<MessageWrapper<DecreaseStockEvent>> typeReference = new TypeReference<MessageWrapper<DecreaseStockEvent>>() {};
        MessageWrapper<DecreaseStockEvent> messageWrapper = JSON.parseObject(new String((byte[]) message.getPayload()), typeReference);
        DecreaseStockEvent decreaseStockEvent = messageWrapper.getMessage();
        log.info("decreaseStockEvent info : {}", decreaseStockEvent);
        try {
            orderService.createOrder(decreaseStockEvent.getProductId(), decreaseStockEvent.getStockLogId());
        } catch (Exception e) {
            log.error("插入訂單失敗, decreaseStockEvent info : {}", decreaseStockEvent, e);
            // 觸發回查
            //設置對應的stockLog為回滾狀態
            StockLog stockLog = stockLogService.getOne(new QueryWrapper<StockLog>().eq("id", decreaseStockEvent.getStockLogId()));
            stockLog.setStatus(2);
            stockLogService.updateById(stockLog);
            return RocketMQLocalTransactionState.ROLLBACK;
        }
        return RocketMQLocalTransactionState.COMMIT;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        log.info("message: {}, args: {}", message);
        MessageWrapper<DecreaseStockEvent> messageWrapper = (MessageWrapper) message.getPayload();
        DecreaseStockEvent decreaseStockEvent = messageWrapper.getMessage();
        StockLog stockLog = stockLogService.getOne(new QueryWrapper<StockLog>().eq("id", decreaseStockEvent.getStockLogId()));
        if (stockLog == null) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        // 已經被扣減了庫存
        if (stockLog.getStatus().intValue() == 1) {
            return RocketMQLocalTransactionState.COMMIT;
            // 初始化狀態
        } else if (stockLog.getStatus().intValue() == 0) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }

}

MQ相關代碼,使用模板方法

DecreaseStockProducer,消息生產者,實現了一些指定方法

@Slf4j
@Component
public class DecreaseStockProducer extends AbstractCommonSendProduceTemplate<DecreaseStockEvent> {

    private final ConfigurableEnvironment environment;

    public DecreaseStockProducer(@Autowired RocketMQTemplate rocketMQTemplate, @Autowired ConfigurableEnvironment environment) {
        super(rocketMQTemplate);
        this.environment = environment;
    }

    @Override
    protected BaseSendExtendDTO buildBaseSendExtendParam(DecreaseStockEvent messageSendEvent) {
        return BaseSendExtendDTO.builder()
                .eventName("庫存同步到mysql")
                .keys(String.valueOf(messageSendEvent.getProductId()))
                .topic(environment.resolvePlaceholders(StockMQConstant.STOCK_TOPIC_KEY))
                .tag(environment.resolvePlaceholders(StockMQConstant.STOCK_DEREASE_STOCK_TAG_KEY))
                .sentTimeout(2000L)
                .build();
    }

    @Override
    protected Message<?> buildMessage(DecreaseStockEvent messageSendEvent, BaseSendExtendDTO requestParam) {
        String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();
        return MessageBuilder
                .withPayload(new MessageWrapper(requestParam.getKeys(), messageSendEvent))
                .setHeader(MessageConst.PROPERTY_KEYS, keys)
                .setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag())
                .build();
    }
}

AbstractCommonSendProduceTemplate,發送消息的類

@Slf4j
@RequiredArgsConstructor
public abstract class AbstractCommonSendProduceTemplate<T> {

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 構建消息發送事件基礎擴充屬性實體
     *
     * @param messageSendEvent 消息發送事件
     * @return 擴充屬性實體
     */
    protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent);

    /**
     * 構建消息基本參數,請求頭、Keys...
     *
     * @param messageSendEvent 消息發送事件
     * @param requestParam     擴充屬性實體
     * @return 消息基本參數
     */
    protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam);

   

    /**
     * 事務消息事件通用發送
     *
     * @param messageSendEvent 事務消息發送事件
     * @return 消息發送返回結果
     */
    public SendResult sendMessageInTransaction(T messageSendEvent) {
        BaseSendExtendDTO baseSendExtendDTO = buildBaseSendExtendParam(messageSendEvent);
        SendResult sendResult;
        try {
            StringBuilder destinationBuilder = StrUtil.builder().append(baseSendExtendDTO.getTopic());
            if (StrUtil.isNotBlank(baseSendExtendDTO.getTag())) {
                destinationBuilder.append(":").append(baseSendExtendDTO.getTag());
            }
            sendResult = rocketMQTemplate.sendMessageInTransaction(
                    destinationBuilder.toString(),
                    buildMessage(messageSendEvent, baseSendExtendDTO),
                    null
            );
            log.info("[{}] 消息發送結果:{},消息ID:{},消息Keys:{}", baseSendExtendDTO.getEventName(), sendResult.getSendStatus(), sendResult.getMsgId(), baseSendExtendDTO.getKeys());
        } catch (Throwable ex) {
            log.error("[{}] 消息發送失敗,消息體:{}", baseSendExtendDTO.getEventName(), JSON.toJSONString(messageSendEvent), ex);
            throw ex;
        }
        return sendResult;
    }

OrderService的createOrder方法:

@Service
@RequiredArgsConstructor
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {

    private final OrderMapper orderMapper;

    private final StockLogMapper stockLogMapper;

    private final RedisTemplate redisTemplate;

    private final TransactionTemplate transactionTemplate;

    private static final String LUA_DECRESE_STOCK_PATH = "lua/decreseStock.lua";

    @Override
    public void createOrder(Integer productId, Integer stockLogId) {

        // 減少Redis里面的庫存
        DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource(LUA_DECRESE_STOCK_PATH)));
        redisScript.setResultType(Long.class);


        // 執行Lua腳本
        Long redisResult = (Long) redisTemplate.execute(redisScript, Collections.singletonList(String.valueOf(productId)));

        if (redisResult < 1L) {
            throw new RuntimeException("庫存售罄");
        }

        // 編程式事務
        transactionTemplate.executeWithoutResult(status -> {
            try {
                // 事務性操作
                Order order = Order.builder()
                        .productId(productId)
                        .productNum(1)
                        .build();
                orderMapper.insert(order);

                // 改stockLog
                StockLog stockLog = stockLogMapper.selectOne(new QueryWrapper<StockLog>().eq("id", stockLogId));
                if (stockLog == null) {
                    throw new RuntimeException("該庫存流水不存在");
                }
                stockLog.setStatus(1);
                stockLogMapper.updateById(stockLog);
                // 如果操作成功,不拋出異常,事務將提交
            } catch (Exception e) {
                // 如果操作失敗,拋出異常,事務將回滾 并且需要補償redis的庫存
                redisTemplate.opsForValue().increment(String.valueOf(productId));
                status.setRollbackOnly();
            }
        });

    }
}

redis的lua腳本代碼如下,這里只會在庫存大于0的時候進行扣減,先檢查庫存,再扣減。如果庫存為0,在redis里面setIfAbsent該商品售罄的標識,這樣子在controller查詢到售罄就直接return

local key = KEYS[1]

-- 檢查鍵是否存在
local exists = redis.call('EXISTS', key)
if exists == 1 then
    -- 鍵存在,獲取值
    local value = redis.call('GET', key)
    if tonumber(value) > 0 then
        -- 如果值大于0,則遞減
        redis.call('DECR', key)
        return 1  -- 表示遞減成功
    else
        local prefix = "product_stock_invalid_"
        local stock_invalid_tag = prefix .. KEYS[1]
        local exists_tag = redis.call('EXISTS', stock_invalid_tag)
        if exists_tag == 0 then
            -- 鍵不存在,設置鍵的值
            redis.call('SET', stock_invalid_tag, "true")
        return 0  -- 表示遞減失敗,值不大于0
        end
    end
else
    return -1  -- 表示遞減失敗,鍵不存在
end

MQ的consumer:

@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = StockMQConstant.STOCK_TOPIC_KEY,
        selectorExpression = StockMQConstant.STOCK_DEREASE_STOCK_TAG_KEY,
        consumerGroup = StockMQConstant.STOCK_DEREASE_STOCK_CG_KEY
)
public class DecreaseStockConsumer implements RocketMQListener<MessageWrapper<DecreaseStockEvent>> {

    private final StockService stockService;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void onMessage(MessageWrapper<DecreaseStockEvent> message) {
        DecreaseStockEvent decreaseStockEvent = message.getMessage();
        Integer productId = decreaseStockEvent.getProductId();
        try {
            stockService.decreaseStock(productId);
        } catch (Exception e) {
            log.error("庫存同步到mysql失敗,productId:{}", productId, e);
            throw e;
        }
    }
}

stockService.decreaseStock()方法如下

    public int decreaseStock(Integer productId) {
        return stockMapper.decreaseStock(productId);
    }

相關的SQL語句

    <update id="decreaseStock">
        UPDATE stock
        SET stock_num = stock_num - 1
        WHERE id = #{id} AND stock_num >= 1
    </update>

消息重復消費問題

我們知道,MQ可能會存在重復消費的問題,包括我在壓測的時候,就存在了重復消費,導致MySQL的庫存最終比redis庫存要少,重復扣減了MySQL的庫存,針對這種情況,應該解決冪等性問題。

在前面我們用MessageWrapper來包裝消息體的時候,每次new一個MessageWrapper都會生成新的UUID,我們將這UUID存到Redis里面來保證冪等性

/**
 * 消息體包裝器
 */
@Data
@Builder
@NoArgsConstructor(force = true)
@AllArgsConstructor
@RequiredArgsConstructor
public final class MessageWrapper<T> implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 消息發送 Keys
     */
    @NonNull
    private String keys;

    /**
     * 消息體
     */
    @NonNull
    private T message;

    /**
     * 唯一標識,用于客戶端冪等驗證
     */
    private String uuid = UUID.randomUUID().toString();

    /**
     * 消息發送時間
     */
    private Long timestamp = System.currentTimeMillis();
}

修改后的扣減庫存方法,先判斷redis里面有沒有存在已經扣除了庫存的標識,有就直接返回

@Service
@RequiredArgsConstructor
public class StockServiceImpl extends ServiceImpl<StockMapper, Stock> implements StockService {

    private final StockMapper stockMapper;

    private final RedisTemplate redisTemplate;

    @Override
    public int decreaseStock(Integer productId, String UUID) {
        if(redisTemplate.hasKey("decrease_mark_" + UUID)) {
            return 0;
        }
        redisTemplate.opsForValue().set("decrease_mark_" + UUID, "true", 24, TimeUnit.HOURS);
        return stockMapper.decreaseStock(productId);
    }
}

下面是上述demo的代碼地址,修改數據庫和mysql地址即可使用

scottyzh/stock-demo: RocketMQ事務消息在訂單生成和扣減庫存的應用 (github.com)

總結

以上是生活随笔為你收集整理的RocketMQ事务消息在订单创建和库存扣减的使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。