日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ

發布時間:2023/12/20 javascript 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

系列文章導航:?Spring Cloud Alibaba微服務解決方案

常用MQ產品的選擇

目前主流的MQ產品有kafka、RabbitMQ、ActiveMQ、RocketMQ等。在MQ選型時可以參照這篇文章選擇合適的MQ產品。

RocketMQ及控制臺搭建

RocketMQ的搭建可以參考這篇文章。

RocketMQ控制臺的搭建可以參考這篇文章。

RocketMQ與Spring Boot整合

pom.xml中添加如下依賴

org.apache.rocketmq

rocketmq-spring-boot-starter

2.0.3

配置文件中添加如下配置

rocketmq:

name-server: 172.17.0.102:9876

producer:

# 構建rocketMQtemplate必須指定group

group: test-producer

生產者發送消息

@Slf4j

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull RocketMQTemplate rocketMQTemplate;

@Override

public Share aduitById(Integer id, ShareAuditDTO shareAuditDTO) {

//第一個參數為主題topic,第二個參數為消息體對象

rocketMQTemplate.convertAndSend("add-bonus",

UserAddBonusMsgDTO.builder()

.userId(share.getUserId())

.bonus(50).build());

return share;

}

}

發送后可在RocketMQ-Console控制臺查看

消費者監聽消息并進行業務處理

@Service

//在@RocketMQMessageListener注解中設置消費者group和監聽主題topic

@RocketMQMessageListener(consumerGroup = "test-consumer", topic = "add-bonus")

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

//將監聽的消息體對象指定為RocketMQListener類的泛型

public class AddBonusListener implements RocketMQListener{

@Override

public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {

//處理業務邏輯

// do something ...

}

}

事務消息

事務消息會將發送的消息進行標記,在收到commit指令后才會進行消息投遞。事務消息的執行流程如下圖:

這個方案有個前提,它假設消費者總是有能力成功處理消息。如果消費者消費失敗,可以進行重試,如果依然失敗,會進入死信隊列。進入死信隊列的消息可以重新入隊,或者人工介入去處理。當然,也可以對消費失敗的消息加入補償機制,來保證數據的一致性。

發送半消息

@Slf4j

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull RocketMQTemplate rocketMQTemplate;

@Override

public String auditById(Integer id, ShareAuditDTO shareAuditDTO, UserAddBonusMsgDTO userAddBonusMsgDTO) {

String transactionId = UUID.randomUUID().toString();

rocketMQTemplate.sendMessageInTransaction(

//group

"tx-add-bonus-group",

//topic

"add-bonus",

//消息體

MessageBuilder.withPayload(userAddBonusMsgDTO)

//設置header,執行本地事務時可以獲取使用

.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)

.setHeader("share_id", id).build()

,

//設置arg,執行本地事務時可以獲取使用

shareAuditDTO

);

return "success";

}

}

本地事務執行及狀態檢查

//指定監聽本地事務的group

@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class AddbonusTransactionListener implements RocketMQLocalTransactionListener {

private final @NonNull IShareService shareService;

private final @NonNull RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

@Override

public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {

//執行本地事務

MessageHeaders headers = message.getHeaders();

String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

Integer shareId = Integer.valueOf((String) headers.get("share_id"));

try {

//service方法中將生成的transactionId進行存儲

shareService.auditByIdInDBWithRocketMQLog(shareId, (ShareAuditDTO) o, transactionId);

return RocketMQLocalTransactionState.COMMIT;

} catch (Exception e) {

return RocketMQLocalTransactionState.ROLLBACK;

}

}

@Override

public RocketMQLocalTransactionState checkLocalTransaction(Message message) {

//檢查本地事務是否執行成功

String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);

//通過檢查本地事務日志記錄,確認本地事務是否執行成功

RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(

RocketmqTransactionLog.builder()

.transactionId(transactionId).build()

);

if (rocketmqTransactionLog != null) {

return RocketMQLocalTransactionState.COMMIT;

}

return RocketMQLocalTransactionState.ROLLBACK;

}

}

RocketMQ與Spring Cloud Stream整合

添加依賴

com.alibaba.cloud

spring-cloud-starter-stream-rocketmq

yml添加配置

spring:

cloud:

stream:

rocketmq:

binder:

name-server: ${rocketmq.name-server}

bindings:

output:

destination: stream-test-topic

#名稱需和@output注解中指定的名稱一致

my-output:

#指定輸出的topic

destination: stream-my-topic

#名稱需和@input注解中指定的名稱一致

my-input:

#指定接收的topic

destination: stream-my-topic

#如果整合RocketMQ,必須設置group

#如果整合其他MQ,可留空

group: binder-my-group

生產者

編寫自定義Source接口,使用@Output注解指定消息管道的名稱,需與yml配置文件中bindings下配置的管道名稱一致,才能在IOC注入時獲取到yml中指定的destination值做為發送的topic,如果不對應會默認使用@Output注解指定的值作為topic,自定義消費者Sink時同理。

public interface MySource {

String MY_OUTPUT = "my-output";

@Output(MY_OUTPUT)

MessageChannel output();

}

啟動類注冊自定義Source接口

@SpringBootApplication

@EnableBinding({Source.class, MySource.class})

public class ContentCenterApplication {

public static void main(String[] args) {

SpringApplication.run(ContentCenterApplication.class, args);

}

}

調用自定義Source接口發送消息

@RestController

@RequestMapping

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class TestController {

private final @NonNull MySource mySource;

@GetMapping("/test-stream")

public String testStream() {

mySource.output().send(MessageBuilder.withPayload("測試stream-2消息體").build());

return "success";

}

}

消費者

編寫自定義Sink接口,使用@Input注解指定消息管道的名稱,需與yml配置文件中bindings下配置的管道名稱一致。

public interface MySink {

String MY_INPUT = "my-input";

@Input(MY_INPUT)

SubscribableChannel input();

}

啟動類注冊自定義Sink接口

@SpringBootApplication

@EnableBinding({Sink.class, MySink.class})

public class UserCenterApplication {

public static void main(String[] args) {

SpringApplication.run(UserCenterApplication.class, args);

}

}

使用@StreamListener注解指定自定義Sink監聽消息并處理

@Service

@Slf4j

public class MyTestStreamConsumer {

@StreamListener(MySink.MY_INPUT)

public void receive(String messageBody) {

log.info("自定義接收器,通過stream收到消息:messageBody = {}", messageBody);

}

}

使用Source發送事務消息

使用Spring Cloud Stream發送RocketMQ的事務消息時,Source接口發送的消息無法在方法調用時指定事務消息的監聽group,需在yml配置中進行設置

spring:

cloud:

stream:

rocketmq:

binder:

name-server: 172.17.0.102:9876

bindings:

#管道名稱需與stream.bindings對應

output:

producer:

#標注為事務消息

transactional: true

#事務消息監聽group名稱,對應@RocketMQTransactionListener注解txProducerGroup屬性

group: tx-add-bonus-group

bindings:

output:

destination: add-bonus

發送事務消息時,僅能夠通過headers發送參數。

@Service

@RequiredArgsConstructor(onConstructor = @__(@Autowired))

public class ShareServiceImpl implements IShareService {

private final @NonNull Source source;

@Override

public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {

//....

String transactionId = UUID.randomUUID().toString();

source.output().send(

MessageBuilder.withPayload(

UserAddBonusMsgDTO.builder()

.userId(share.getUserId())

.bonus(50).build())

//設置header,執行本地事務時可以獲取使用

.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)

.setHeader("share_id", id)

.setHeader("dto", JSON.toJSONString(shareAuditDTO))

.build()

);

//....

return share;

}

}

Spring Cloud Stream消息過濾消費

參考這篇文章。

Spring Cloud Stream異常處理

參考這篇文章。

Spring Cloud Stream概念及注解

參考這篇文章。

創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎

總結

以上是生活随笔為你收集整理的消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。