日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

RocketMQ简介、环境搭建

發布時間:2024/4/13 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 RocketMQ简介、环境搭建 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

RocketMQ雙主雙從異步復制集群搭建?

vim /etc/hosts 127.0.0.1 rocketmq-nameserver1 127.0.0.1 rocketmq-master1 127.0.0.1 rocketmq-master1-slavetar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local/ mv alibaba-rocketmq alibaba-rocketmq-3.2.6ln -s alibaba-rocketmq-3.2.6/ rocketmqmkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir /usr/local/rocketmq/store/consumequeue mkdir /usr/local/rocketmq/store/index mkdir /usr/local/rocketmq/store/checkpointcd /usr/local/rocketmq/conf/2m-noslave vim broker-a.properties#所屬集群名字 #一:(4個節點的集群名字要相同) brokerClusterName=rocketmq-cluster #broker名字,注意此處不同的配置文件填寫的不一樣 #二:(如果是broker-a.properties和broker-a-s.properties文件的話修改為broker-a,如果是broker-b.properties和broker-b-s.properties修改為broker-b) brokerName=broker-a #0 表示 Master,>0 表示 Slave #(0 表示 Master,>0 表示 Slave) brokerId=0 #nameServer地址,分號分割 #三:(有幾個節點就應該有幾個nameserver使用";"隔開) namesrvAddr=rocketmq-nameserver1:9876 #在發送消息時,自動創建服務器不存在的topic,默認創建的隊列數 defaultTopicQueueNums=4 #是否允許 Broker 自動創建Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true #是否允許 Broker 自動創建訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽端口 listenPort=10911 #刪除文件時間點,默認凌晨 4點 deleteWhen=04 #文件保留時間,默認 48 小時 fileReservedTime=120 #commitLog每個文件的大小默認1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每個文件默認存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88 #存儲路徑 storePathRootDir=/usr/local/rocketmq/store #commitLog 存儲路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存儲路徑 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存儲路徑 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 異步復制Master #- SYNC_MASTER 同步雙寫Master #- SLAVE #四:(如果是broker-a-s.properties文件和broker-b-s.properties文件的話角色發生變化變為SLAVE) brokerRole=ASYNC_MASTER #刷盤方式 #- ASYNC_FLUSH 異步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發消息線程池數量 #sendMessageThreadPoolNums=128 #拉消息線程池數量 #pullMessageThreadPoolNums=128mkdir -p /usr/local/rocketmq/logs cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml vim /usr/local/rocketmq/bin/runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m" vim /usr/local/rocketmq/bin/runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m"cd /usr/local/rocketmq/bin/ nohup sh mqnamesrv & jps tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.lognohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-noslave/broker-a.properties >/dev/null 2>&1 &

RocketMQ-單機版安裝及遠程連接測試

yum install wget wget http://www-us.apache.org/dist/rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-4.2.0 cp -r rocketmq-4.2.0 /usr/local/cd rocketmq-4.2.0/ nohup sh bin/mqnamesrv & vim bin/runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" vim bin/runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" vim bin/tools.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log // 查看namaserver日志 jps nohup sh bin/mqbroker -n localhost:9876 & vim /usr/local/rocketmq-4.2.0/conf/broker.conf namesrvAddr=localhost:9876 brokerIP1=localhost nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq-4.2.0/conf/broker.conf & tail -f ~/logs/rocketmqlogs/broker.log sh mqshutdown broker sh mqshutdown namesrv

?

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aztech.RocketMQ</groupId><artifactId>RocketMQ</artifactId><version>0.0.1-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.2.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.29</version></dependency></dependencies></project> package com.aztech.test;import java.util.List;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//聲明并初始化一個consumer//需要一個consumer group名字作為構造方法的參數,這里為consumer1DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");//consumer.setVipChannelEnabled(false);//同樣也要設置NameServer地址consumer.setNamesrvAddr("localhost:9876");//這里設置的是一個consumer的消費策略//CONSUME_FROM_LAST_OFFSET 默認策略,從該隊列最尾開始消費,即跳過歷史消息//CONSUME_FROM_FIRST_OFFSET 從隊列最開始開始消費,即歷史消息(還儲存在broker的)全部消費一遍//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,默認是半個小時以前consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//設置consumer所訂閱的Topic和Tag,*代表全部的Tagconsumer.subscribe("TopicTest", "*");//設置一個Listener,主要進行消息的邏輯處理consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);System.out.println("----------------------------------------------------------------------------------");//返回消費狀態//CONSUME_SUCCESS 消費成功//RECONSUME_LATER 消費失敗,需要稍后重新消費return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//調用start()方法啟動consumerconsumer.start();System.out.println("Consumer Started.");} } package com.aztech.test;import java.io.UnsupportedEncodingException;import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {// 聲明并初始化一個producer// 需要一個producer group名字作為構造方法的參數,這里為producer1DefaultMQProducer producer = new DefaultMQProducer("producer1");producer.setVipChannelEnabled(false);// 設置NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔// NameServer的地址必須有// producer.setClientIP("localhost");// producer.setInstanceName("Producer");producer.setNamesrvAddr("localhost:9876");// 調用start()方法啟動一個producer實例producer.start();// 發送1條消息到Topic為TopicTest,tag為TagA,消息內容為“Hello RocketMQ”拼接上i的值try {// 封裝消息Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)// body);// 調用producer的send()方法發送消息// 這里調用的是同步的方式,所以會有返回結果SendResult sendResult = producer.send(msg);// 打印返回結果System.out.println(sendResult);} catch (RemotingException e) {e.printStackTrace();} catch (MQBrokerException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}//發送完消息之后,調用shutdown()方法關閉producerSystem.out.println("send success");producer.shutdown();} } tar -zxvf apache-tomcat-9.0.2.tar.gz -C /usr/local/mkdir rocketmq-console https://github.com/duomu/rocketmq-console unzip rocketmq.war -d rocketmq-console/ cd /usr/local/apache-tomcat-9.0.2/webapps/rocketmq/WEB-INF/classes vim config.properties rocketmq.namesrv.addr=127.0.0.1:9876 bin/startup.sh tail -f -n 500 ../logs/catalina.out import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {// 聲明并初始化一個producer// 需要一個producer group名字作為構造方法的參數,這里為producer1DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");producer.setNamesrvAddr("localhost:9876");producer.start();for (int i = 0; i < 5; i++) {try {Message msg = new Message("TopicQuickStart",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes()// body);SendResult sendResult = producer.send(msg);System.out.println(sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();} } import java.util.List;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");consumer.setNamesrvAddr("localhost:9876");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicQuickStart", "*");//設置一個Listener,主要進行消息的邏輯處理consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {String topic = msg.getTopic();String msgBody = new String(msg.getBody(),"UTF-8");String tags = msg.getTags();System.out.println("收到消息: " + " topic :" + topic + " ,tags :" + tags + " ,msg :" + msgBody);}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}//System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);//System.out.println("----------------------------------------------------------------------------------");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//調用start()方法啟動consumerconsumer.start();System.out.println("Consumer Started.");} }

?

總結

以上是生活随笔為你收集整理的RocketMQ简介、环境搭建的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。