65.RocketMQ
RocketMQ
1.簡介
首先要先了解一下為什么要引入RocketMQ消息隊列是什么?
? 比如公司本身的業務體量很小,所以直接單機一把梭哈都能搞定了,但是后面業務體量不斷擴大,采用微服務的設計思想,分布式的部署方式,所以拆分了很多的服務,隨著體量的增加以及業務場景越來越復雜了,很多場景單機的技術棧和中間件以及不夠用了,而且對系統的友好性也下降了,最后做了很多技術選型的工作,我們決定引入消息隊列中間件。
主要功能:異步、削峰、解耦
RocketMQ:
它是一款分布式、隊列模型(queue)的消息中間件,是Alibaba自主研發的專業消息中間件,實現了業務消峰、分布式事務的優秀框架。
2.安裝
工欲善其事必先利其器
image-20211207192901447.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-WSxb0pD4-1638880091963)(C:\Users\王元元\AppData\Roaming\Typora\typora-user-images\image-20211207193256601.png)]
簡單來講,binary是編譯好的可以直接使用,source是還沒編譯過的源代碼,需要自行編譯。
4.解壓
5.配置環境變量
6
ROCKETMQ_HOME="D:\rocketmq" NAMESRV_ADDR="localhost:9876" # 啟動 nameserver .\bin\mqnamesrv.cmd # 啟動broker .\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true3.使用
3.1案例一
創建maven quickstart項目
添加依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version> </dependency>發送
package com.woniuxy.cloud.simple;import com.woniuxy.cloud.AppConstants; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException; import java.util.Scanner;public class Sender {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {//(1)創建生產者/*** 生產者分組*/DefaultMQProducer producer = new DefaultMQProducer("TestSender");producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);//(2)啟動producerproducer.start();//(3)構建消息并發送Scanner scanner = new Scanner(System.in);while (true) {System.out.println("請輸入要發送的消息");String smsContent = scanner.next();if (smsContent.equals("exit")) {//(4)關閉producerproducer.shutdown();}Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", smsContent.getBytes("UTF-8"));//同步發送到RocketMQSendResult sendResult = producer.send(msg);System.out.println("sendResult:" + sendResult);}} }接收
3.2發送的三種模式
1.發送-同步確認發送結果
同步發送是指消息發送方發出一條消息后,會在收到服務端響應后才發嚇一條的通訊方式。
- 應用場景:此場景應用非常廣泛,ex:重要的通知郵件、報名短信通知、營銷短信系統等。
2.發送-異步確認發送結果
3.發送-結束 oneway
發送方只負責發送消息,不等待服務端返回響應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。
- 應用場景:適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。
對比
他的優缺點是啥
RocketMQ優點:
單機吞吐量:十萬級
可用性:非常高,分布式架構
消息可靠性:經過參數優化配置,消息可以做到0丟失
功能支持:MQ功能較為完善,還是分布式的,擴展性好
支持10億級別的消息堆積,不會因為堆積導致性能下降
源碼是java,我們可以自己閱讀源碼,定制自己公司的MQ,可以掌控
天生為金融互聯網領域而生,對于可靠性要求很高的場景,尤其是電商里面的訂單扣款,以及業務削峰,在大量交易涌入時,后端可能無法及時處理的情況
RoketMQ在穩定性上可能更值得信賴,這些業務場景在阿里雙11已經經歷了多次考驗,如果你的業務有上述并發場景,建議可以選擇RocketMQ
RocketMQ缺點:
支持的客戶端語言不多,目前是java及c++,其中c++不成熟
社區活躍度不是特別活躍那種
沒有在 mq 核心中去實現JMS等接口,有些系統要遷移需要修改大量代碼
消息類型
分類:普通消息、順序消息、延時消息、事務消息
消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。
順序消費的原理解析,在默認的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。
下面用訂單進行分區有序的示例。一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。
延時消息
比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
現在RocketMq并不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應著等級1到18 消息消費失敗會進入延時消息隊列,消息發送時間與設置的延時等級和重試次數有關,詳見代碼SendMessageProcessor.java
付款就取消訂單釋放庫存。
private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
現在RocketMq并不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應著等級1到18 消息消費失敗會進入延時消息隊列,消息發送時間與設置的延時等級和重試次數有關,詳見代碼SendMessageProcessor.java
總結
以上是生活随笔為你收集整理的65.RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网络上演变革实战 宽带讲究精耕细作
- 下一篇: 写个小文件让自己的电脑定时关机吧!