日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

志宇-RocketMQ学习

發(fā)布時間:2024/3/26 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 志宇-RocketMQ学习 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

RocketMQ

  • RocketMQ安裝
  • RocketMQ-console安裝
  • RocketMQ簡單使用
  • RabbitMQ核心概念
    • 消息發(fā)送狀態(tài)(返回對象中的枚舉類型有4種)
    • 重試次數(shù)
    • RocketMQ發(fā)送消息三種方式
    • RocketMQ延遲消息設(shè)置
    • 消息順序消費
    • 消費端配置參數(shù)詳情
    • tag標(biāo)簽
    • 消費模式
    • Offset和CommitLog
  • 事務(wù)使用
  • 集群部署

RocketMQ安裝

官方安裝方法
先后安裝Name Server(起到路由功能)和 Broker(RocketMQ服務(wù)) 然后測試下發(fā)送和接收可用

內(nèi)存不夠在/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/bin目錄下修改runbroker.sh tools.sh runserver.sh這三個文件中JAVA_OPT參數(shù),修改完如下

#僅僅修改已經(jīng)存在的配置即可,4g換成256m或者128m #runbroker.sh broker占用的內(nèi)存大小 #tools.sh 測試發(fā)送和接收工具的內(nèi)存大小 #runserver.sh 路由的內(nèi)存和大小 JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"

RocketMQ-console安裝

  • 下載
  • 方式一、git下載,執(zhí)行如下命令 git clone https://github.com/apache/rocketmq-externals.git方式二、直接下載,訪問如下地址即可 https://github.com/apache/rocketmq-externals/archive/master.zip
  • 修改配置
  • 找到rocketmq-console/src/main/resources/application.properties 修改配置

    #console端口 server.port=17890 #name server地址 rocketmq.config.namesrvAddr=localhost:9876

    找到rocketmq-console目錄下的pom.xml文件,修改配置

    <rocketmq.version>4.4.0XXX</rocketmq.version> 4.4.0XXX 修改為 4.4.0(你的RocketMQ版本) <rocketmq.version>4.4.0</rocketmq.version>
  • 編譯
  • 進(jìn)入rocketmq-console目錄,編譯打包 mvn clean package -Dmaven.test.skip=true
    進(jìn)入target目錄 ,啟動 java -jar rocketmq-console-ng-1.0.0.jar
    守護(hù)進(jìn)程方式啟動 nohup java -jar rocketmq-console-ng-1.0.0.jar &

    RocketMQ簡單使用

    常見錯誤一

    org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

    解決辦法
    因為服務(wù)器中可能有多塊網(wǎng)卡,rocketmq要指定公網(wǎng)ip
    在配置文件rocketmq-all-4.8.0-source-release/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中添加
    brokerIP1=39.107.109.94此ip為公網(wǎng)ip
    然后重新啟動 ,啟動命令如下:
    jps 找到啟動進(jìn)程,kill -9 關(guān)閉進(jìn)程,然后已守護(hù)進(jìn)程啟動
    nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

    常見錯誤二

    MQClientException: No route info of this topic, TopicTest1

    解決辦法
    來到/distribution/target/rocketmq-4.8.0/rocketmq-4.8.0目錄下
    查看已經(jīng)配置信息sh bin/mqbroker -m 配置信息如下

    2021-02-18 16\:37\:49 INFO main - namesrvAddr= 2021-02-18 16\:37\:49 INFO main - brokerIP1=172.18.0.1 2021-02-18 16\:37\:49 INFO main - brokerName=iZ2ze8twmjge9w7qf1yhyyZ 2021-02-18 16\:37\:49 INFO main - brokerClusterName=DefaultCluster 2021-02-18 16\:37\:49 INFO main - brokerId=0 2021-02-18 16\:37\:49 INFO main - autoCreateTopicEnable=true 2021-02-18 16\:37\:49 INFO main - autoCreateSubscriptionGroup=true 2021-02-18 16\:37\:49 INFO main - msgTraceTopicName=RMQ_SYS_TRACE_TOPIC 2021-02-18 16\:37\:49 INFO main - traceTopicEnable=false 2021-02-18 16\:37\:49 INFO main - rejectTransactionMessage=false 2021-02-18 16\:37\:49 INFO main - fetchNamesrvAddrByAddressServer=false 2021-02-18 16\:37\:49 INFO main - transactionTimeOut=6000 2021-02-18 16\:37\:49 INFO main - transactionCheckMax=15 2021-02-18 16\:37\:49 INFO main - transactionCheckInterval=60000 2021-02-18 16\:37\:49 INFO main - aclEnable=false 2021-02-18 16\:37\:49 INFO main - storePathRootDir=/root/store 2021-02-18 16\:37\:49 INFO main - storePathCommitLog=/root/store/commitlog 2021-02-18 16\:37\:49 INFO main - flushIntervalCommitLog=500 2021-02-18 16\:37\:49 INFO main - commitIntervalCommitLog=200 2021-02-18 16\:37\:49 INFO main - flushCommitLogTimed=false 2021-02-18 16\:37\:49 INFO main - deleteWhen=04 2021-02-18 16\:37\:49 INFO main - fileReservedTime=72 2021-02-18 16\:37\:49 INFO main - maxTransferBytesOnMessageInMemory=262144 2021-02-18 16\:37\:49 INFO main - maxTransferCountOnMessageInMemory=32 2021-02-18 16\:37\:49 INFO main - maxTransferBytesOnMessageInDisk=65536 2021-02-18 16\:37\:49 INFO main - maxTransferCountOnMessageInDisk=8 2021-02-18 16\:37\:49 INFO main - accessMessageInMemoryMaxRatio=40 2021-02-18 16\:37\:49 INFO main - messageIndexEnable=true 2021-02-18 16\:37\:49 INFO main - messageIndexSafe=false 2021-02-18 16\:37\:49 INFO main - haMasterAddress= 2021-02-18 16\:37\:49 INFO main - brokerRole=ASYNC_MASTER 2021-02-18 16\:37\:49 INFO main - flushDiskType=ASYNC_FLUSH 2021-02-18 16\:37\:49 INFO main - cleanFileForciblyEnable=true 2021-02-18 16\:37\:49 INFO main - transientStorePoolEnable=false

    修改配置文件distribution/target/rocketmq-4.8.0/rocketmq-4.8.0/conf/broker.conf中autoCreateTopicEnable=true,true 則代表可以自動創(chuàng)建topic,生產(chǎn)上要關(guān)閉,如果這個參數(shù)是true還創(chuàng)建不了topic可能是因為程序引入jar包的版本和RabbitMQ版本不同導(dǎo)致的

    常用配置說明

    compressMsgBodyOverHowmuch :消息超過默認(rèn)字節(jié)4096后進(jìn)行壓縮 retryTimesWhenSendFailed : 失敗重發(fā)次數(shù) maxMessageSize : 最大消息配置,默認(rèn)128k topicQueueNums : 主題下面的隊列數(shù)量,默認(rèn)是4 autoCreateTopicEnable : 是否自動創(chuàng)建主題Topic, 開發(fā)建議為true,生產(chǎn)要為false defaultTopicQueueNums : 自動創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊列數(shù) autoCreateSubscriptionGroup: 是否允許 Broker 自動創(chuàng)建訂閱組,建議線下開發(fā)開啟,線上關(guān)閉 brokerClusterName : 集群名稱 brokerId : 0表示Master主節(jié)點 大于0表示從節(jié)點 brokerIP1 : Broker服務(wù)地址 brokerRole : broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE deleteWhen : 每天執(zhí)行刪除過期文件的時間,默認(rèn)每天凌晨4點 flushDiskType :刷盤策略, 默認(rèn)為 ASYNC_FLUSH(異步刷盤), 另外是SYNC_FLUSH(同步刷盤) listenPort : Broker監(jiān)聽的端口號 mapedFileSizeCommitLog : 單個conmmitlog文件大小,默認(rèn)是1GB mapedFileSizeConsumeQueue:ConsumeQueue每個文件默認(rèn)存30W條,可以根據(jù)項目調(diào)整 storePathRootDir : 存儲消息以及一些配置信息的根目錄 默認(rèn)為用戶的 ${HOME}/store storePathCommitLog:commitlog存儲目錄默認(rèn)為${storePathRootDir}/commitlog storePathIndex: 消息索引存儲路徑 syncFlushTimeout : 同步刷盤超時時間 diskMaxUsedSpaceRatio : 檢測可用的磁盤空間大小,超過后會寫入報錯(磁盤沒有滿卻寫入不進(jìn)RabbitMQ是就有可能是因為這個參數(shù),這個參數(shù)就是當(dāng)磁盤還剩余百分之多少時就不允許在寫入消息了)

    常見錯誤三
    控制臺查看不了數(shù)據(jù),提示連接 10909錯誤,因為RocketMQ自帶VIP虛擬ip技術(shù),這時要防火墻要開放10909端口,才能使用

    RabbitMQ核心概念

    消息發(fā)送狀態(tài)(返回對象中的枚舉類型有4種)

    生產(chǎn)者發(fā)送完信息返回的類型(返回類型SendResult中的SendStatus成員變量)
    FLUSH_DISK_TIMEOUT
    在指定時間沒有將消息同步到磁盤中(刷盤策略需要為SYNC_FLUSH 才會出這個錯誤)
    例如:在cpu爆滿時候?qū)е聸]有刷盤成功

    FLUSH_SLAVE_TIMEOUT
    主從模式下,broker是SYNC_MASTER, 沒有在規(guī)定時間內(nèi)完成主從同步
    例如:網(wǎng)路原因等,導(dǎo)致主從同步?jīng)]有成功,如果是異步復(fù)制則不會出現(xiàn)這個問題

    SLAVE_NOT_AVAILABLE
    從模式下,broker是SYNC_MASTER, 但是沒有找到被配置成Slave的Broker
    例如:主從模式下,所有從節(jié)點宕機,如果是異步復(fù)制則不會出現(xiàn)這個問題

    SEND_OK
    發(fā)送成功,沒有發(fā)生上面的三種問題

    重試次數(shù)

    生產(chǎn)者重試:RocketMQ中默認(rèn)次數(shù)是2次,一般不是跨國調(diào)用不用修改重試次數(shù)了
    重試次數(shù)設(shè)置方法如下

    //設(shè)置方法如下 private DefaultMQProducer producer; //設(shè)置同步調(diào)用重試次數(shù) producer.setRetryTimesWhenSendFailed(0); //設(shè)置異步調(diào)用重試次數(shù) producer.setRetryTimesWhenSendAsyncFailed(0);

    異步發(fā)送不會重試,需自己書寫代碼重試

    //這里會開啟一個線程異步發(fā)送消息 produceProxy.getProducet().send(message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {//處理消息}@Overridepublic void onException(Throwable e) {//進(jìn)行重試} });

    消費者重試:默認(rèn)次數(shù)16次,當(dāng)網(wǎng)絡(luò)中斷、消費者報錯、ack確認(rèn)失敗導(dǎo)致重試
    手動設(shè)置重試次數(shù)方法如下

    //1、廣播方式不支持重試機制 //2、無論發(fā)送多少遍key都不變 //3、重試間隔時間默認(rèn)為:`messageDelayLevel`=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h//設(shè)置為廣播模式,如果是廣播模式則重試次數(shù)失效//consumer.setMessageModel(MessageModel.BROADCASTING);//設(shè)置為集群模式,默認(rèn)是集群模式consumer.setMessageModel(MessageModel.CLUSTERING);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {MessageExt messageExt = msgs.get(0);//不會變這個keys,是發(fā)送過來的String keys = messageExt.getKeys();//重試次數(shù)int reconsumeTimes = messageExt.getReconsumeTimes();try{//TODO 業(yè)務(wù)邏輯處理return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}catch (Exception e){if (reconsumeTimes>=3) {//TODO 記錄數(shù)據(jù)庫或者發(fā)送郵件給運維人員return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}});

    RocketMQ發(fā)送消息三種方式

    同步發(fā)送: 發(fā)送驗證碼,郵件通知中使用,速快快,當(dāng)前線程反饋,可靠
    異步發(fā)送:注冊發(fā)送優(yōu)惠券中使用,速快快,非前線程反饋,可靠
    OneWay: 日志采集中使用,速度最快,沒有反饋,相對不可靠

    RocketMQ延遲消息設(shè)置

    開源版本不支持定時發(fā)送,只支持固定的發(fā)送時間
    在源碼中rocketmq-store 項目> MessageStoreConfig.java >的成員變量 messageDelayLevel中有固定的延遲時間

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

    發(fā)送消息時設(shè)定延遲等級

    Message message = new Message("topicName","TagA","sendMessage"); //0等級代表不延遲 message.setDelayTimeLevel(0) //1代表延遲一秒鐘 message.setDelayTimeLevel(1) //2代表延遲5秒鐘 //1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 以此類推 message.setDelayTimeLevel(2)

    消息順序消費

    全局順序消費:例如比特幣交易過程中人民幣換成美元時候按照從低到高的順序
    局部順序消費:例如 訂單狀態(tài)的消息提醒
    局部順序的使用說明:
    1、RocketMQ上默認(rèn)一個topic上有4個queue,順序消費要將消息投遞到同一個topic對應(yīng)的同一個隊列中(根據(jù)業(yè)務(wù)id取模然后投遞到同一個隊列上,通過MessageQueueSelector實現(xiàn)類實現(xiàn))
    2、順序消息暫不支持廣播模式、異步發(fā)送方式
    3、順序消費支持多個消費者進(jìn)行消費(消費者消費前會對消費的隊列加鎖)
    4、消費者部署的節(jié)點數(shù)要小于此topic對應(yīng)的queue的數(shù)量(消費數(shù)量會均等分給消費者,不然有的消費者收不到消息)
    5、消費者單線程處理,使用MessageListenerOrderly替代MessageListenerConcurrently
    發(fā)送消息偽代碼如下

    //假設(shè)是訂單號 Integer orderId = 1; Message message = new Message(RocketMQConfig.orderTopicName, "TagB", ("sendMessage").getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult send = produceProxy.getOrderProducer().send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {//傳遞過來的 orderId (唯一標(biāo)識)Long queueNum = (Long) arg;//此topic上有幾個queueint size = mqs.size();//判斷使用哪個keylong selectNo = queueNum % size;//選擇的queue數(shù)量必須小于配置的,否則會出錯//返回選擇第幾個queuereturn mqs.get((int) selectNo);} }, orderId); send.getSendStatus();

    接收消息偽代碼如下

    //這里使用MessageListenerOrderly替代MessageListenerConcurrently(使用單線程消費) consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {MessageExt messageExt = msgs.get(0);String body = new String(messageExt.getBody(), "utf-8");System.out.println(body);return ConsumeOrderlyStatus.SUCCESS;}} );

    消費端配置參數(shù)詳情

    //設(shè)置組 一個項目中一個組對應(yīng)一個消費者 consumer = new DefaultMQPushConsumer(RocketMQConfig.groupName); //設(shè)置地址 consumer.setNamesrvAddr(RocketMQConfig.serverAddresses); //設(shè)置訂閱對應(yīng)的topic consumer.subscribe(RocketMQConfig.topicName, "*"); //設(shè)置默認(rèn)消費隊列中最后一個,默認(rèn)也是這個配置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //消費者均等消費策略 ,默認(rèn)也是這個配置 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()); //設(shè)置存儲在本地還是遠(yuǎn)程,默認(rèn)廣播存儲在消費者本地、其他存儲在遠(yuǎn)程 //consumer.setOffsetStore(new RemoteBrokerOffsetStore()); //設(shè)置消費者最大線程數(shù) consumer.setConsumeThreadMax(100); //設(shè)置消費者最小線程數(shù) consumer.setConsumeThreadMin(5); //消費者一次從mq中拉取多少條數(shù)據(jù) consumer.setPullBatchSize(32); //拉取后每次消費多少條 consumer.setConsumeMessageBatchMaxSize(1); //設(shè)置為廣播模式,如果是廣播模式則重試次數(shù)失效 //consumer.setMessageModel(MessageModel.BROADCASTING); //設(shè)置為集群模式,默認(rèn)是集群模式 consumer.setMessageModel(MessageModel.CLUSTERING);

    tag標(biāo)簽

    消費者可以選擇消費 某個Group 中的某個 topic 中的指定Tag
    1、消費者手動過濾Tag (沒有用到的tag也進(jìn)行傳輸浪費資源)
    2、RocketMQ選擇發(fā)送給消費者

    消費模式

    1、Broker獲得消息然后將消息發(fā)送給消費者(消費者壓力大)
    2、消費者間隔去向Broker拉取(沒有消息也會去拉取、間隔時間不好設(shè)置)
    3、Broker和消費者之間每15秒發(fā)起一次長連接(默認(rèn))

    Offset和CommitLog

    CommitLog用于存儲發(fā)送的消息內(nèi)容
    Offset 用于存儲消息存儲在隊列中的下標(biāo)
    CommitLog默認(rèn)存儲位置 根目錄下store/consumequeue/{topicName}/{queueid}/fileName

    事務(wù)使用

    集群部署

    推薦同步雙寫、異步刷盤

    總結(jié)

    以上是生活随笔為你收集整理的志宇-RocketMQ学习的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。