javascript
消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ
系列文章導航:?Spring Cloud Alibaba微服務解決方案
常用MQ產品的選擇
目前主流的MQ產品有kafka、RabbitMQ、ActiveMQ、RocketMQ等。在MQ選型時可以參照這篇文章選擇合適的MQ產品。
RocketMQ及控制臺搭建
RocketMQ的搭建可以參考這篇文章。
RocketMQ控制臺的搭建可以參考這篇文章。
RocketMQ與Spring Boot整合
pom.xml中添加如下依賴
org.apache.rocketmq
rocketmq-spring-boot-starter
2.0.3
配置文件中添加如下配置
rocketmq:
name-server: 172.17.0.102:9876
producer:
# 構建rocketMQtemplate必須指定group
group: test-producer
生產者發送消息
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareServiceImpl implements IShareService {
private final @NonNull RocketMQTemplate rocketMQTemplate;
@Override
public Share aduitById(Integer id, ShareAuditDTO shareAuditDTO) {
//第一個參數為主題topic,第二個參數為消息體對象
rocketMQTemplate.convertAndSend("add-bonus",
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50).build());
return share;
}
}
發送后可在RocketMQ-Console控制臺查看
消費者監聽消息并進行業務處理
@Service
//在@RocketMQMessageListener注解中設置消費者group和監聽主題topic
@RocketMQMessageListener(consumerGroup = "test-consumer", topic = "add-bonus")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
//將監聽的消息體對象指定為RocketMQListener類的泛型
public class AddBonusListener implements RocketMQListener{
@Override
public void onMessage(UserAddBonusMsgDTO userAddBonusMsgDTO) {
//處理業務邏輯
// do something ...
}
}
事務消息
事務消息會將發送的消息進行標記,在收到commit指令后才會進行消息投遞。事務消息的執行流程如下圖:
這個方案有個前提,它假設消費者總是有能力成功處理消息。如果消費者消費失敗,可以進行重試,如果依然失敗,會進入死信隊列。進入死信隊列的消息可以重新入隊,或者人工介入去處理。當然,也可以對消費失敗的消息加入補償機制,來保證數據的一致性。
發送半消息
@Slf4j
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareServiceImpl implements IShareService {
private final @NonNull RocketMQTemplate rocketMQTemplate;
@Override
public String auditById(Integer id, ShareAuditDTO shareAuditDTO, UserAddBonusMsgDTO userAddBonusMsgDTO) {
String transactionId = UUID.randomUUID().toString();
rocketMQTemplate.sendMessageInTransaction(
//group
"tx-add-bonus-group",
//topic
"add-bonus",
//消息體
MessageBuilder.withPayload(userAddBonusMsgDTO)
//設置header,執行本地事務時可以獲取使用
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id).build()
,
//設置arg,執行本地事務時可以獲取使用
shareAuditDTO
);
return "success";
}
}
本地事務執行及狀態檢查
//指定監聽本地事務的group
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddbonusTransactionListener implements RocketMQLocalTransactionListener {
private final @NonNull IShareService shareService;
private final @NonNull RocketmqTransactionLogMapper rocketmqTransactionLogMapper;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
//執行本地事務
MessageHeaders headers = message.getHeaders();
String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
Integer shareId = Integer.valueOf((String) headers.get("share_id"));
try {
//service方法中將生成的transactionId進行存儲
shareService.auditByIdInDBWithRocketMQLog(shareId, (ShareAuditDTO) o, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
//檢查本地事務是否執行成功
String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
//通過檢查本地事務日志記錄,確認本地事務是否執行成功
RocketmqTransactionLog rocketmqTransactionLog = rocketmqTransactionLogMapper.selectOne(
RocketmqTransactionLog.builder()
.transactionId(transactionId).build()
);
if (rocketmqTransactionLog != null) {
return RocketMQLocalTransactionState.COMMIT;
}
return RocketMQLocalTransactionState.ROLLBACK;
}
}
RocketMQ與Spring Cloud Stream整合
添加依賴
com.alibaba.cloud
spring-cloud-starter-stream-rocketmq
yml添加配置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: ${rocketmq.name-server}
bindings:
output:
destination: stream-test-topic
#名稱需和@output注解中指定的名稱一致
my-output:
#指定輸出的topic
destination: stream-my-topic
#名稱需和@input注解中指定的名稱一致
my-input:
#指定接收的topic
destination: stream-my-topic
#如果整合RocketMQ,必須設置group
#如果整合其他MQ,可留空
group: binder-my-group
生產者
編寫自定義Source接口,使用@Output注解指定消息管道的名稱,需與yml配置文件中bindings下配置的管道名稱一致,才能在IOC注入時獲取到yml中指定的destination值做為發送的topic,如果不對應會默認使用@Output注解指定的值作為topic,自定義消費者Sink時同理。
public interface MySource {
String MY_OUTPUT = "my-output";
@Output(MY_OUTPUT)
MessageChannel output();
}
啟動類注冊自定義Source接口
@SpringBootApplication
@EnableBinding({Source.class, MySource.class})
public class ContentCenterApplication {
public static void main(String[] args) {
SpringApplication.run(ContentCenterApplication.class, args);
}
}
調用自定義Source接口發送消息
@RestController
@RequestMapping
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class TestController {
private final @NonNull MySource mySource;
@GetMapping("/test-stream")
public String testStream() {
mySource.output().send(MessageBuilder.withPayload("測試stream-2消息體").build());
return "success";
}
}
消費者
編寫自定義Sink接口,使用@Input注解指定消息管道的名稱,需與yml配置文件中bindings下配置的管道名稱一致。
public interface MySink {
String MY_INPUT = "my-input";
@Input(MY_INPUT)
SubscribableChannel input();
}
啟動類注冊自定義Sink接口
@SpringBootApplication
@EnableBinding({Sink.class, MySink.class})
public class UserCenterApplication {
public static void main(String[] args) {
SpringApplication.run(UserCenterApplication.class, args);
}
}
使用@StreamListener注解指定自定義Sink監聽消息并處理
@Service
@Slf4j
public class MyTestStreamConsumer {
@StreamListener(MySink.MY_INPUT)
public void receive(String messageBody) {
log.info("自定義接收器,通過stream收到消息:messageBody = {}", messageBody);
}
}
使用Source發送事務消息
使用Spring Cloud Stream發送RocketMQ的事務消息時,Source接口發送的消息無法在方法調用時指定事務消息的監聽group,需在yml配置中進行設置
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 172.17.0.102:9876
bindings:
#管道名稱需與stream.bindings對應
output:
producer:
#標注為事務消息
transactional: true
#事務消息監聽group名稱,對應@RocketMQTransactionListener注解txProducerGroup屬性
group: tx-add-bonus-group
bindings:
output:
destination: add-bonus
發送事務消息時,僅能夠通過headers發送參數。
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class ShareServiceImpl implements IShareService {
private final @NonNull Source source;
@Override
public Share auditById(Integer id, ShareAuditDTO shareAuditDTO) {
//....
String transactionId = UUID.randomUUID().toString();
source.output().send(
MessageBuilder.withPayload(
UserAddBonusMsgDTO.builder()
.userId(share.getUserId())
.bonus(50).build())
//設置header,執行本地事務時可以獲取使用
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("share_id", id)
.setHeader("dto", JSON.toJSONString(shareAuditDTO))
.build()
);
//....
return share;
}
}
Spring Cloud Stream消息過濾消費
參考這篇文章。
Spring Cloud Stream異常處理
參考這篇文章。
Spring Cloud Stream概念及注解
參考這篇文章。
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的消息驱动 微服务器,消息驱动的微服务-Spring Cloud Stream整合RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阵列信号处理及matlab实现,《阵列信
- 下一篇: java json转excel_JSON