日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

65.RocketMQ

發布時間:2024/1/1 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 65.RocketMQ 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RocketMQ

1.簡介

首先要先了解一下為什么要引入RocketMQ消息隊列是什么?

? 比如公司本身的業務體量很小,所以直接單機一把梭哈都能搞定了,但是后面業務體量不斷擴大,采用微服務的設計思想分布式的部署方式,所以拆分了很多的服務,隨著體量的增加以及業務場景越來越復雜了,很多場景單機的技術棧和中間件以及不夠用了,而且對系統的友好性也下降了,最后做了很多技術選型的工作,我們決定引入消息隊列中間件

主要功能:異步、削峰、解耦

RocketMQ:

它是一款分布式、隊列模型(queue)的消息中間件,是Alibaba自主研發的專業消息中間件,實現了業務消峰、分布式事務的優秀框架。

2.安裝

工欲善其事必先利其器

  • 官網:https://rocketmq.apache.org/



  • image-20211207192901447.png)]

  • [外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-WSxb0pD4-1638880091963)(C:\Users\王元元\AppData\Roaming\Typora\typora-user-images\image-20211207193256601.png)]

    簡單來講,binary是編譯好的可以直接使用,source是還沒編譯過的源代碼,需要自行編譯。

    4.解壓

    5.配置環境變量

    6

    ROCKETMQ_HOME="D:\rocketmq" NAMESRV_ADDR="localhost:9876"
  • # 啟動 nameserver .\bin\mqnamesrv.cmd # 啟動broker .\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

    3.使用

    3.1案例一

  • 創建maven quickstart項目

  • 添加依賴

    <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.1</version> </dependency>
  • 發送

    package com.woniuxy.cloud.simple;import com.woniuxy.cloud.AppConstants; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException; import java.util.Scanner;public class Sender {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {//(1)創建生產者/*** 生產者分組*/DefaultMQProducer producer = new DefaultMQProducer("TestSender");producer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);//(2)啟動producerproducer.start();//(3)構建消息并發送Scanner scanner = new Scanner(System.in);while (true) {System.out.println("請輸入要發送的消息");String smsContent = scanner.next();if (smsContent.equals("exit")) {//(4)關閉producerproducer.shutdown();}Message msg = new Message(AppConstants.SMS_TOPIC, "user_register", smsContent.getBytes("UTF-8"));//同步發送到RocketMQSendResult sendResult = producer.send(msg);System.out.println("sendResult:" + sendResult);}} }
  • 接收

  • package com.woniuxy.cloud.simple;import com.woniuxy.cloud.AppConstants; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt;import java.io.UnsupportedEncodingException; import java.util.List;public class Receiver {public static void main(String[] args) throws MQClientException {//(1)創建消費者實例//消費者分組,同一個名字的消費者組成一個集群DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive");consumer.setNamesrvAddr(AppConstants.ROCKETMQ_NAMESERVER_ADDR);//(2)訂閱某個主題,收到特定的消息consumer.subscribe(AppConstants.SMS_TOPIC,"*");//(3)向MQ注冊一個監聽器/*msgs 消息列表context 消息上下文*/consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msgExt:msgs){try {System.out.println("消息內容:"+new String(msgExt.getBody(),"utf-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// (4)啟動消費者實例consumer.start();System.out.printf("Consumer Started.%n");} }

    3.2發送的三種模式

    1.發送-同步確認發送結果

    同步發送是指消息發送方發出一條消息后,會在收到服務端響應后才發嚇一條的通訊方式。

    • 應用場景:此場景應用非常廣泛,ex:重要的通知郵件、報名短信通知、營銷短信系統等。

    2.發送-異步確認發送結果

    3.發送-結束 oneway

    發送方只負責發送消息,不等待服務端返回響應且沒有回調函數觸發,即只發送請求不等待應答。此方式發送消息的過程耗時非常短,一般在微秒級別。

    • 應用場景:適用于某些耗時非常短,但對可靠性要求并不高的場景,例如日志收集。

    對比

    他的優缺點是啥

    RocketMQ優點:

    單機吞吐量:十萬級

    可用性:非常高,分布式架構

    消息可靠性:經過參數優化配置,消息可以做到0丟失

    功能支持:MQ功能較為完善,還是分布式的,擴展性好

    支持10億級別的消息堆積,不會因為堆積導致性能下降

    源碼是java,我們可以自己閱讀源碼,定制自己公司的MQ,可以掌控

    天生為金融互聯網領域而生,對于可靠性要求很高的場景,尤其是電商里面的訂單扣款,以及業務削峰,在大量交易涌入時,后端可能無法及時處理的情況

    RoketMQ在穩定性上可能更值得信賴,這些業務場景在阿里雙11已經經歷了多次考驗,如果你的業務有上述并發場景,建議可以選擇RocketMQ

    RocketMQ缺點:

    支持的客戶端語言不多,目前是java及c++,其中c++不成熟

    社區活躍度不是特別活躍那種

    沒有在 mq 核心中去實現JMS等接口,有些系統要遷移需要修改大量代碼

    消息類型

    分類:普通消息、順序消息、延時消息、事務消息

    消息有序指的是可以按照消息的發送順序來消費(FIFO)。RocketMQ可以嚴格的保證消息有序,可以分為分區有序或者全局有序。

    順序消費的原理解析,在默認的情況下消息發送會采取Round Robin輪詢方式把消息發送到不同的queue(分區隊列);而消費消息的時候從多個queue上拉取消息,這種情況發送和消費是不能保證順序。但是如果控制發送的順序消息只依次發送到同一個queue中,消費的時候只從這個queue上依次拉取,則就保證了順序。當發送和消費參與的queue只有一個,則是全局有序;如果多個queue參與,則為分區有序,即相對每個queue,消息都是有序的。

    下面用訂單進行分區有序的示例。一個訂單的順序流程是:創建、付款、推送、完成。訂單號相同的消息會被先后發送到同一個隊列中,消費時,同一個OrderId獲取到的肯定是同一個隊列。

    延時消息

    比如電商里,提交了一個訂單就可以發送一個延時消息,1h后去檢查這個訂單的狀態,如果還是未付款就取消訂單釋放庫存。

    private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

    現在RocketMq并不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應著等級1到18 消息消費失敗會進入延時消息隊列,消息發送時間與設置的延時等級和重試次數有關,詳見代碼SendMessageProcessor.java

    付款就取消訂單釋放庫存。

    private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

    現在RocketMq并不支持任意時間的延時,需要設置幾個固定的延時等級,從1s到2h分別對應著等級1到18 消息消費失敗會進入延時消息隊列,消息發送時間與設置的延時等級和重試次數有關,詳見代碼SendMessageProcessor.java

    總結

    以上是生活随笔為你收集整理的65.RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。