日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Rocketmq原理最佳实践

發布時間:2024/4/13 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Rocketmq原理最佳实践 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

一、 MQ背景&選型 消息隊列作為高并發系統的核心組件之一,能夠幫助業務系統解構提升開發效率和系統穩定性。 目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka 具有主要優勢特性有: 支持事務型消息(消息發送和DB操作保持兩方的最終一致性,rabbitmq和kafka不支持) 支持結合rocketmq的多個系統之間數據最終一致性(多方事務,二方事務是前提) 支持18個級別的延遲消息(rabbitmq和kafka不支持) 支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq需要手動確認) 支持consumer端tag過濾,減少不必要的網絡傳輸(rabbitmq和kafka不支持) 支持重復消費(rabbitmq不支持,kafka支持)二、RocketMQ集群概述 1. RocketMQ集群部署結構 1) Name Server Name Server是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。 2) Broker Broker部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave 只能對應一個Master,Master與Slave的對應關系通過指定相同的Broker Name,不同的Broker Id 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與Name Server集群中的所有節點建立長連接,定時(每隔30s)注冊Topic信息到所有 Name Server。Name Server定時(每隔10s)掃描所有存活broker的連接,如果Name Server超過2分鐘 沒有收到心跳,則Name Server斷開與Broker的連接。3) Producer Producer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由 信息,并向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群 部署。 Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送 心跳,Broker每隔10s中掃描所有存活的連接,如果Broker在2分鐘內沒有收到心跳數據,則關閉與Producer 的連接。4) Consumer Consumer與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由 信息,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer 既可以從Master訂閱消息,也可以從Slave訂閱消息,訂閱規則由Broker配置決定。 Consumer每隔30s從Name server獲取topic的最新隊列情況,這意味著Broker不可用時,Consumer最多 需要30s才能感知。 Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向所有關聯的broker發送 心跳,Broker每隔10s掃描所有存活的連接,若某個連接2分鐘內沒有發送心跳數據,則關閉連接; 并向該Consumer Group的所有Consumer發出通知,Group內的Consumer重新分配隊列,然后繼續消費。 當Consumer得到master宕機通知后,轉向slave消費,slave不能保證master的消息100%都同步過來了, 因此會有少量的消息丟失。但是一旦master恢復,未同步過去的消息會被最終消費掉。三、 Rocketmq如何支持分布式事務消息 場景 A(存在DB操作)、B(存在DB操作)兩方需要保證分布式事務一致性,通過引入中間層MQ,A和MQ保持 事務一致性(異常情況下通過MQ反查A接口實現check),B和MQ保證事務一致(通過重試),從而達到 最終事務一致性。 1. MQ與DB一致性原理(兩方事務) MQ消息、DB操作一致性方案: 1)發送消息到MQ服務器,此時消息狀態為SEND_OK。此消息為consumer不可見。 2)執行DB操作;DB執行成功Commit DB操作,DB執行失敗Rollback DB操作。 3)如果DB執行成功,回復MQ服務器,將狀態為COMMIT_MESSAGE;如果DB執行失敗,回復MQ服務器, 將狀態改為ROLLBACK_MESSAGE。注意此過程有可能失敗。 4)MQ內部提供一個名為“事務狀態服務”的服務,此服務會檢查事務消息的狀態,如果發現消息未COMMIT, 則通過Producer啟動時注冊的TransactionCheckListener來回調業務系統,業務系統 在checkLocalTransactionState方法中檢查DB事務狀態,如果成功,則回復COMMIT_MESSAGE, 否則回復ROLLBACK_MESSAGE。 以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀態,在MQ服務器內部 是一個數字。 TransactionCheckListener 是在消息的commit或者rollback消息丟失的情況下才會回調。這種消息 丟失只存在于斷網或者rocketmq集群掛了的情況下。當rocketmq集群掛了,如果采用異步刷盤,存在1s內 數據丟失風險,異步刷盤場景下保障事務沒有意義。所以如果要核心業務用Rocketmq解決分布式事務問題, 建議選擇同步刷盤模式。2. 多系統之間數據一致性(多方事務) 當需要保證多方(超過2方)的分布式一致性,上面的兩方事務一致性(通過Rocketmq的事務性消息解決) 已經無法支持。這個時候需要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。1)交易系統創建訂單(往DB插入一條記錄),同時發送訂單創建消息。通過RocketMq事務性消息保證一致性 2)接著執行完成訂單所需的同步核心RPC服務(非核心的系統通過監聽MQ消息自行處理,處理結果不會影響 交易狀態)。執行成功更改訂單狀態,同時發送MQ消息。3.案例分析 1) 單機環境下的事務示意圖 如下為A給B轉賬的例子。 步驟 動作 1 鎖定A的賬戶 2 鎖定B的賬戶 3 檢查A賬戶是否有1元 4 A的賬戶扣減1元 5 給B的賬戶加1元 6 解鎖B的賬戶 7 解鎖A的賬戶 以上過程在代碼層面甚至可以簡化到在一個事物中執行兩條sql語句。2) 分布式環境下事務 和單機事務不同,A、B賬戶可能不在同一個DB中,此時無法像在單機情況下使用事物來實現。此時可以 通過一下方式實現,將轉賬操作分成兩個操作。a) A賬戶 步驟 動作 1 鎖定A的賬戶 2 檢查A賬戶是否有1元 3 A的賬戶扣減1元 4 解鎖A的賬戶b) MQ消息 A賬戶數據發生變化時,發送MQ消息,MQ服務器將消息推送給轉賬系統,轉賬系統來給B賬號加錢。c) B賬戶 步驟 動作 1 鎖定B的賬戶 2 給B的賬戶加1元 3 解鎖B的賬戶四、 順序消息 1. 順序消息缺陷 發送順序消息無法利用集群Fail Over特性消費順序消息的并行度依賴于隊列數量隊列熱點問題,個別隊列 由于哈希不均導致消息過多,消費速度跟不上,產生消息堆積問題遇到消息失敗的消息,無法跳過,當前 隊列消費暫停。2. 原理 produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者注冊消息監聽器 為MessageListenerOrderly,這樣就可以保證消費端只有一個線程去消費消息。注意:把消息發到同一個隊列(queue),不是同一個topic,默認情況下一個topic包括4個queue五、 最佳實踐 1. Producer 1) Topic 一個應用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應用自由設置。只有發送消息 設置了tags,消費方在訂閱消息時,才可以利用tags 在broker做消息過濾。2) key 每個消息在業務層面的唯一標識碼,要設置到 keys 字段,方便將來定位消息丟失問題。 //訂單Id String orderId= "20034568923546"; message.setKeys(orderId);3) 日志 消息發送成功或者失敗,要打印消息日志,務必要打印 send result 和key 字段。4) send send消息方法,只要不拋異常,就代表發送成功。但是發送成功會有多個狀態,在sendResult里定義。 SEND_OK:消息發送成功 FLUSH_DISK_TIMEOUT:消息發送成功,但是服務器刷盤超時,消息已經進入服務器隊列,只有此時服務器 宕機,消息才會丟失 FLUSH_SLAVE_TIMEOUT:消息發送成功,但是服務器同步到Slave時超時,消息已經進入服務器隊列, 只有此時服務器宕機,消息才會丟失 SLAVE_NOT_AVAILABLE:消息發送成功,但是此時slave不可用,消息已經進入服務器隊列,只有此時 服務器宕機,消息才會丟失2. Consumer 1) 冪等 RocketMQ使用的消息原語是At Least Once,所以consumer可能多次收到同一個消息,此時務必做好冪等。2) 日志 消費時記錄日志,以便后續定位問題。3) 批量消費 盡量使用批量方式消費方式,可以很大程度上提高消費吞吐量。

分布式消息中間件-Rocketmq

簡述 消息中間件主要是實現分布式系統中解耦、異步消息、流量銷鋒、日志處理等場景,現在生產中用的 最多的消息隊列有Activemq,rabbitmq,kafka,rocketmq等。分享的版本也是基于3.2.6進行的。JMS規范 rocketmq雖然不完全基于jms規范,但是他參考了jms規范和 CORBA Notification 規范等,可以說是 青出于藍而勝于藍什么是jms呢jms其實就是類似于jdbc的一套接口規范,但不同的是他是面向的消息服務,提供一套標準API接口, 大部分廠商都會參考jms規范,不過我們后面要講到的rocketmq卻沒有嚴格遵守jms規范,后面我們會講到。一些常見的jms廠商有:APACHE開源的ActiveMQ。這里面Activemq這個也是我接觸到的第一個mq,現在 市場份額也是很大的,京東商城采用的就是這個。基本概念 發送者( Sender)也就是消息的生產者,就是創建并發送消息的JMS客戶端接收者( Receiver)也就是消息消費者,接收訂制消息的并按照相應的業務邏輯進行處理,最終將結果反饋給mq的服務端。點對點( Point-to-Point(P2P) )點對點就是一對一的關系,一個消息發出只有一個接受者所處理。每個消息都被發送到一個特定的隊列, 接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。發布訂閱( Publish/Subscribe(Pub/Sub) )1、客戶端將消息發送到主題。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。2、如果你希望發送的消息可以不被做任何處理、或者被一個消息者處理、或者可以被多個消費者 處理的話,那么可以采用Pub/Sub模型消息隊列(Queue) 與隊列名字所暗示的意思不同,消息的接受順序并不一定要與消息的發送順序相同。一旦一個消息被閱讀, 該消息將被從隊列中移走。主題(Topic) 一種支持發送消息給多個訂閱者的機制。發布者(Publisher) 同生產者訂閱者(Subscriber)針對同一主題的多個消費者

點對點

發布訂閱

對象模型 (1) ConnectionFactory創建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory 和TopicConnectionFactory兩種(很顯然是基于點對點和和發布訂閱的兩種方式分別創建連接工廠的)。 可以通過JNDI來查找ConnectionFactory對象。(2) Destination Destination的意思是消息生產者的消息發送目標或者說消息消費者的消息來源。對于消息生產者來說, 它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination 也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、Topic 可以通過JNDI來查找Destination。(3) Connection Connection表示在客戶端和JMS系統之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生 一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。(4) SessionSession是我們操作消息的接口。可以通過session創建生產者、消費者、消息等。Session提供了事務 的功能。當我們需要使用session發送/接收多個消息時,可以將這些發送/接收動作放到一個事務中。同樣, 也分QueueSession和TopicSession。(5) 消息的生產者消息生產者由Session創建,并用于將消息發送到Destination。同樣,消息生產者分兩種類型: QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發送消息。(6) 消息消費者 消息消費者由Session創建,用于接收被發送到Destination的消息。兩種類型:QueueReceiver 和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來創建。 當然,也可以session的creatDurableSubscriber方法來創建持久化的訂閱者。(7) MessageListener消息監聽器。如果注冊了消息監聽器,一旦消息到達,將自動調用監聽器的onMessage方法。我們后面 消息消費還會看到。消息消費 在JMS中,消息的產生和消息是異步的。對于消費來說,JMS的消息者可以通過兩種方式來消費消息。○ 同步 訂閱者或接收者調用receive方法來接收消息,receive方法在能夠接收到消息之前(或超時之前)將一直阻塞○ 異步 訂閱者或接收者可以注冊為一個消息監聽器。當消息到達之后,系統自動調用監聽器的onMessage方法。這邊通過activemq的部分代碼來簡單說明一下上面說道的一些JMS規范public void init(){try {//創建一個鏈接工廠(用戶名,密碼,broker的url地址)connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);//從工廠中創建一個鏈接connection = connectionFactory.createConnection();//開啟鏈接connection.start();//創建一個會話session = connection.createSession(true,Session.SESSION_TRANSACTED);} catch (JMSException e) {e.printStackTrace();}}公共部分:也就是說不管你是消息的生產者還是消息的消費者都需要這些步驟1.首先我們需要創建一個連接工廠,當然這里我們需要輸入用戶名和密碼還有就是broker的url 2.然后我們根據連接工廠創建了一個連接,此刻這個工廠并沒有和broker連接 3.調用start方法就和broker建立了連接,這里我大概解釋一下broker 4.broker:消息隊列核心,相當于一個控制中心,負責路由消息、保存訂閱和連接、消息確認 和控制事務,activemq可以配置多個public void sendMsg(String queueName){try {//創建一個消息隊列(此處也就是在創建Destination)Queue queue = session.createQueue(queueName);//消息生產者MessageProducer messageProducer = null;if(threadLocal.get()!=null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while(true){Thread.sleep(1000);int num = count.getAndIncrement();//創建一條消息TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:生產消息,count:"+num);//發送消息messageProducer.send(msg);//提交事務session.commit();}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}生產:配置完上面的公共部分我們就迫不及待的把消息生產出來吧,我這邊說的是點對點的方式 1.通過session創建一個Destination,我這邊直接就用了queue了 2.接下來我們需要創建一個消息的生產者 3.我這邊就循環每1s發送一條消息 4.這邊看到我們的消息也是用session來創建的,這里面我們用的是文本的消息類型 5.發送消息 6.提交這次發送,至此我們的消息就發送到了broker上了,用過activemq的同學都知道, activemq提供了一個很好用的界面可以查到你的消息的狀態,包括是否消費等消費:消費我們上面也提到了兩種方式,同步和異步,我這邊準備了兩份代碼分別說明了一下public void doMessage(String queueName){try {//創建DestinationQueue queue = session.createQueue(queueName);MessageConsumer consumer = null;while(true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if(msg!=null) {msg.acknowledge();System.out.println(Thread.currentThread().getName()+ ": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement());}else {break;}}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}同步:可以看到消息會一直阻塞到有消息才會繼續1.通過session創建一個Destination,我這邊直接就用了queue了 2.創建了一個Consumer 3.做了一個死循環,類似于ServerSocket的accept方法,我們的receive會阻塞到這里,直到有消息 4.如果消息不為空告知消息消費成功consumer.setMessageListener(MessageListener { public void onMessage(Message msg) { try { String message = ((TextMessage) msg).getText(); if(msg != null){msg.acknowledgeSystem.out.println("成功消費消息:"+message);} } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } );異步:前兩部和上面是一樣的,注冊了一個監聽接口的實現,當有消息時就調用onMessage的實現, 后面就一樣了

RocketMQ介紹

簡介 rocketmq是阿里巴巴開源的一款分布式的消息中間件,他源于jms規范但是不遵守jms規范。如果你了用過 其他mq并且了解過rocketmq,就知道rocketmq天生就是分布式的,可以說是broker、provider、consumer 等各種分布式。 1.能夠保證嚴格的消息順序(需要集群的支持) 2.提供豐富的消息拉取模式 3.高效的訂閱者水平擴展能力(通過一個consumerGroup的方式做到consumer的方便擴容) 4.實時的消息訂閱機制(消息的實時推送,類似于上面咱們的異步消費的方式) 5.億級消息堆積能力(輕松完成系統銷鋒)發展歷史 一、 Metaq(Metamorphosis) 1.x 由開源社區 killme2008 維護,開源社區非常活躍。 https://github.com/killme2008/Metamorphosis二、 Metaq 2.x 于 2012 年 10 月份上線,在淘寶內部被廣泛使用三、改名為RocketMQ 公司內部開源共建的原則,rocketmq只維護了核心功能,可以方面每個SUB(業務單元)定制,當然阿里內部 之所以提供高效的新能出了rocketmq本身之外還依賴于另外一個產品(oceanbase陽振坤) https://github.com/apache/rocketmq 當前版本為4.2.0-SNAPSHOT選擇的理由1.強調集群無單點,可擴展,任意一點高可用,水平可擴展 方便集群配置,而且容易擴展(橫向和縱向),通過slave的方式每一點都可以實現高可用2.支持上萬個隊列,順序消息順序消費是實現在同一隊列的,如果高并發的情況就需要隊列的支持,rocketmq可以滿足上萬個隊列 同時存在3.任性定制你的消息過濾rocketmq提供了兩種類型的消息過濾,也可以說三種可以通過topic進行消息過濾、可以通過tag進行 消息過濾、還可以通過filter的方式任意定制過濾4.消息的可靠性(無Buffer,持久化,容錯,回溯消費) rocketmq的所有消息都是持久化的,生產者本身可以進行錯誤重試,發送者也會按照時間階梯的方式進行 消息重發,5.海量消息堆積能力,消息堆積后,寫入低延遲 對于consumer,如果是集群方式一旦master返現消息堆積會向consumer下發一個重定向指令, 此時consumer就可以從slave進行數據消費了6.消息失敗重試機制7.定時消費 出了上面的配置,在發送消息是也可以針對message設置setDelayTimeLevel8.活躍的開源社區 現在rocketmq成為了apache的一款開源產品,活躍度也是不容懷疑的9.成熟度(經過雙十一考驗) 針對本身的成熟度,我們看看這么多年的雙十一就可想而知了專有術語NameServer 這里我們可以理解成類似于zk的一個注冊中心,而且rocketmq最初也是基于zk作為注冊中心的,現在相當于 為rocketmq自定義了一個注冊中心,代碼不超過1000行。RocketMQ 有多種配置方式可以令客戶端找到 Name Server, 然后通過 Name Server 再找到 Broker,客戶端提供http和ip+端口號的兩種方式,推薦 使用http的方式可以實現nameserver的熱部署Push ConsumerConsumer 的一種,應用通常通過 Consumer 對象注冊一個 Listener 接口,一旦收到消息, Consumer 對象立刻回調 Listener 接口方法,類似于activemq的方式Pull Consumer Consumer 的一種,應用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制Producer Group 一類producer的集合名稱,這類producer通常發送一類消息,且發送邏輯一致Consumer Group同上,consumer的集合名稱Broker 消息中轉的角色,負責存儲消息(實際的存儲是調用的store組件完成的),轉發消息,一般也成為server, 通jms中的providerMessage Filter 可以實現高級的自定義的消息過濾,java編寫Master/Slave 集群的主從關系,broker的name相同,brokerid=0的為主,大于0的為從部署方式NameServer :類似云zk的集群,主要是維護了broker的相關內容,進行存取;節點之間無任何數據同步 1、接收broker的注冊 2、Producer獲取topic下所有的BrokerQueue,put消息 3、Consumer獲取topic下所有的BrokerQueue,get消息Broker : 部署相對復雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能 對應Master,Master和Slave的對應關系通過制定相同的BrokerName來確定,通過制定BrokerId來區分 主從,如果是0則為Master,如果大于0則為Slave。Master也可以部署多個。每個Broker與Name Server 集群中的所有節點建立長連接,定時注冊Topic信息到所有的NameServerProducer: 與Name sever集群中的其中一個節點(隨意選擇)建立長連接,定期的從Name Server取Topic路由信息, 并向提供Topic服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可以集群部署。Consumer: 與Name Server集群中的其中一個節點(隨機選擇)建立長連接,定期從Name Server取Topic路由信息,并向 提供Topic的Master、Slave簡歷長連接,且定時向Master、Slave發送心跳,Consumer既可以從Master 訂閱消息,也可以從Slave訂閱消息,訂閱規則有Broker配置決定。邏輯部署Producer Group:用來表示一個發送消息應用,一個Producer Group下辦好多個Producer實例,可是多臺機器,也可以是 一臺機器的多個線程,或者一個進程的多個Producer對象,一個Producer Group可以發送多個Topic 消息,Producer Group的作用如下: 1、標識一類Producer(分布式) 2、可以通過運維工具查詢這個發送消息應用有多少個Producer 3、發送分布式事務消息時,如果Producer中途意外當即,Broker會主動回調Producer Group內的 任意一臺機器來確認事務狀態。 Consumer Group: 用來表示一個消費消息應用,一個Consumer Group下包含多個Consumer實例,可以是多臺機器,也可以是 多個進程,或者是一個進程的多個Consumer對象。一個Consumer Group下的多個Consumer以均攤方式 消費消息。如果設置為廣播方式,那么這個Consumer Group下的每個實例都消費全量數據。單Master模式 只有一個 Master節點 優點:配置簡單,方便部署 缺點:這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用多Master模式 一個集群無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master 優點:配置簡單,單個Master 宕機或重啟維護對應用無影響,在磁盤配置為RAID10 時,即使機器宕機 不可恢復情況下,由與 RAID10磁盤非常可靠,消息也不會丟(異步刷盤丟失少量消息,同步刷盤一條 不丟)。性能最高。多 Master 多 Slave 模式,異步復制缺點:單臺機器宕機期間,這臺機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受到 受到影響多Master多Slave模式(異步復制)每個 Master 配置一個 Slave,有多對Master-Slave, HA,采用異步復制方式,主備有短暫消息延遲, 毫秒級。 優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為Master 宕機后,消費者仍然可以 從 Slave消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。 缺點: Master 宕機,磁盤損壞情況,會丟失少量消息。多Master多Slave模式(同步雙寫)每個 Master 配置一個 Slave,有多對Master-Slave, HA采用同步雙寫方式,主備都寫成功, 向應用返回成功。 優點:數據與服務都無單點, Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高 缺點:性能比異步復制模式略低,大約低 10%左右,發送單個消息的 RT會略高。目前主宕機后,備機不能 自動切換為主機,后續會支持自動切換功能特性使用 Quick start Producer:/*** Producer,發送消息* */ public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("pay_topic_01");producer.setNamesrvAddr("100.8.8.88:9876");producer.start();for (int i = 0; i < 1000; i++) {try {Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes()// body);SendResult sendResult = producer.send(msg);System.out.println(sendResult);}catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}producer.shutdown();}1、創建一個Producer的,這里我們看到rocketmq的創建producer很簡單只輸入一個Group Name名字就 可以,不向activemq那么復雜 2、第二步就是制定Name Server的地址,這里注意兩點,一個就是nameserver的默認端口是9876,另一個 就是多個nameserver集群用分號來分割 3、我這邊循環發送了1000個消息 4、消息創建也很簡單,第一個參數是topic,第二個就是tags(多個tag用||連接),第三個參數是消息內容 5、調用send方法就能發送成功了,不用想actimemq那樣需要commitConsumer:/*** Consumer,訂閱消息*/ public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");consumer.setNamesrvAddr("100.8.8.88:9876");/*** 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費<br>* 如果非第一次啟動,那么按照上次消費的位置繼續消費*/consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer Started.");}1、前兩步和Producer是一樣的 2、這里可以設置從那個位置開始讀取消息,一般我們會從頭部開始讀取消費,系統中注意去重,也就是冪等 3、訂閱topic,第一個參數是topic名字,第二個是tag,如果為*的就是全部消息 4、注冊一個監聽,如果有消息就會實時的推送到Consumer,調用consumeMessage進行消費,這里我們 看到msgs是一個List,默認每次推送的是一條消息。 5、進行消息的消費邏輯,消費成功后會返回CONSUME_SUCCESS狀態消息過濾 RocketMq的消息過濾是從訂閱的時候開始的,我們看到剛才的例子都是通過topic的tags進行的過濾,這個 要求Producer發送的事后指定tags,Consumer在訂閱消費的時候指定的tags才會對消息進行過濾,這種是 簡單的過濾方式,不過也可以滿足我們大部分的消息過濾。順序消息 因為一些消息可以需要按照順序消費才有意義,比如某例子現在是異步去執行的當然現在是采用的定時的 方式,比如我們把現在的模式套上來,看看順序消費是一個什么樣子。訂單創建》分批》打包》外發。。。 。,rocketmq實現的方式也很簡單,只要我們把這些消息都放到一個隊列中就能夠做到順序消費了,實際 上rocketmq的順序消費有兩種方式,一種是普通的順序消費(多Master多Slave的異步復制),一種是嚴格 的順序消費(多Master多Slave的同步雙寫)。/*** Producer,發送順序消息*/ public class Producer {public static void main(String[] args) {try {MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("100.8.8.88:9876");producer.start();String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };for (int i = 0; i < 100; i++) {// 訂單ID相同的消息要有序int orderId = i % 10;Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.println(sendResult);}producer.shutdown();}catch (MQClientException e) {e.printStackTrace();}catch (RemotingException e) {e.printStackTrace();}catch (MQBrokerException e) {e.printStackTrace();}catch (InterruptedException e) {e.printStackTrace();}}1、首先要保障消息要同時在一個topic中 2、要保障要發送的消息有相同的tag 3、在發送時要保障將數據發送到同一個隊列(queue),我們這里采用的取模的方式 rocketmq可以同時支持上完個隊列,這個也是為了順序消費來考慮的事務消息 比如有兩個賬戶張三、李四,張三要給李四轉10塊錢,以下都在同一個事務中進行,鎖定是通過事務中完成的1、鎖定張三和李四的賬戶 2、判斷張三的賬戶是否大于等于10塊錢,如果大于等于則繼續,小于則返回,我們這里只討論大于等于的 3、從張三的賬戶上減去10塊 4、向李四的賬戶增加10塊 5、解鎖賬戶完成交易update account set amount = amount - 100 where userNo='zhangsan' and amount >=10 update account set amount = amount + 100 where userNo='lisi'如果是分布式事務就要考慮到兩個用戶賬戶的一致性,我們就從分布式的角度來分析一下 1、鎖定張三的賬戶,同時通過網絡鎖定李四的賬戶(可以理解成凍結金額) 2、判斷張三的賬戶是否大于等于10塊錢,如果大于等于則繼續,小于則返回,我們這里只討論大于等于的 3、從張三的賬戶上減去10塊 4、通過網絡向李四的賬戶增加10塊 5、解鎖張三賬戶完成交易,通過網絡解鎖李四的賬戶,時間基本上是累計的通過rocketmq怎么做這個事兒呢,首先通過rocketmq做這個事兒我們就要分清一下角色,張三為 事務的發起者也就是消息的發送者,相對李四就是消息的消費者了,rocketmq可以理解成中間賬戶, 默認Consumer都會成功,如果不成功官方推薦人工介入。1、判斷張三的賬戶金額大于10 2、同時張三的賬戶減去10 3、同時丟出一個mq消息給rocketmq,兩個要確保放在一個db事務中(此時的消息只是處于prapared階段, 不會被Consumer所消費) 4、如果本地事務執行成功則向rocketmq發送commit 5、如果第四部出現了本Consumer宕機,也就是rocketmq沒有收到commit,此刻消息是是未知,所以 他會向任意一臺Producer來確認當前消息的狀態 5、如果第四部出現了本Consumer宕機,也就是rocketmq沒有收到commit,此刻消息是是未知,所以他會 向任意一臺Producer來確認當前消息的狀態 6、從此保障了本地賬戶和rocketmq的一致性/*** 發送事務消息例子* */ public class TransactionProducer {public static void main(String[] args) throws MQClientException, InterruptedException {TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("100.8.8.88:9876");// 事務回查最小并發數producer.setCheckThreadPoolMinSize(2);// 事務回查最大并發數producer.setCheckThreadPoolMaxSize(2);// 隊列數producer.setCheckRequestHoldMax(2000);producer.setTransactionCheckListener(transactionCheckListener);producer.start();String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" };TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();for (int i = 0; i < 100; i++) {try {Message msg =new Message("TopicTest", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes());SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);System.out.println(sendResult);}catch (MQClientException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {Thread.sleep(1000);}producer.shutdown();} }本地事務:/*** 執行本地事務*/ public class TransactionExecuterImpl implements LocalTransactionExecuter {private AtomicInteger transactionIndex = new AtomicInteger(1);@Overridepublic LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {int value = transactionIndex.getAndIncrement();if (value == 0) {throw new RuntimeException("Could not find db");}else if ((value % 5) == 0) {return LocalTransactionState.ROLLBACK_MESSAGE;}else if ((value % 4) == 0) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.UNKNOW;} }回調檢查點:/*** 未決事務,服務器回查客戶端*/ public class TransactionCheckListenerImpl implements TransactionCheckListener {private AtomicInteger transactionIndex = new AtomicInteger(0);@Overridepublic LocalTransactionState checkLocalTransactionState(MessageExt msg) {System.out.println("server checking TrMsg " + msg.toString());int value = transactionIndex.getAndIncrement();if ((value % 6) == 0) {throw new RuntimeException("Could not find db");}else if ((value % 5) == 0) {return LocalTransactionState.ROLLBACK_MESSAGE;}else if ((value % 4) == 0) {return LocalTransactionState.COMMIT_MESSAGE;}return LocalTransactionState.UNKNOW;}點對點/廣播 點對點和發布訂閱的兩種模式上面我們已經說了很多,這里只要我們在consumer里面配置MessageModel就 可以做到兩種模式的消費,//發布訂閱consumer.setMessageModel(MessageModel.BROADCASTING);//集群消費(默認)//consumer.setMessageModel(MessageModel.CLUSTERING);推送/拉取 采用消息推送的模式,注冊監聽,當有消息產生時就會實時的推送到Consumer進行消費, import java.util.HashMap; import java.util.Map; import java.util.Set;import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageQueue;/*** PullConsumer,訂閱消息*/ public class PullConsumer {private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();public static void main(String[] args) throws MQClientException {DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");consumer.start();Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");for (MessageQueue mq : mqs) {System.out.println("Consume from the queue: " + mq);SINGLE_MQ: while (true) {try {PullResult pullResult =consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);System.out.println(pullResult);putMessageQueueOffset(mq, pullResult.getNextBeginOffset());switch (pullResult.getPullStatus()) {case FOUND:// TODObreak;case NO_MATCHED_MSG:break;case NO_NEW_MSG:break SINGLE_MQ;case OFFSET_ILLEGAL:break;default:break;}}catch (Exception e) {e.printStackTrace();}}}consumer.shutdown();}private static void putMessageQueueOffset(MessageQueue mq, long offset) {offseTable.put(mq, offset);}private static long getMessageQueueOffset(MessageQueue mq) {Long offset = offseTable.get(mq);if (offset != null)return offset;return 0;}}消息回溯 回溯消費是指 Consumer 已經消費成功的消息,由于業務上需求需要重新消費,Broker 在Consumer 投遞 成功消息后,消息仍然需要保留。并且重新消費一般是按照時間維度,例如由于 Consumer 系統故障,恢復 后需要重新消費 1 小時前的數據,那舉 Broker 要提供一種機制,可以按照時間維度來回退消費進度 RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯 操作: mqadmin resetOffsetByTime

Kafka、RabbitMQ、RocketMQ消息中間件的對比

分布式系統中,我們廣泛運用消息中間件進行系統間的數據交換,便于異步解耦。產品 RocketMQ (MetaQ的內核) 那么,消息中間件性能究竟哪家強?中間件測試組對常見的三類消息產品(Kafka、RabbitMQ、RocketMQ)做了性能比較。Kafka是LinkedIn開源的分布式發布-訂閱消息系統,目前歸屬于Apache定級項目。Kafka主要特點是基于Pull 的模式來處理消息消費,追求高吞吐量,一開始的目的就是用于日志收集和傳輸。0.8版本開始支持復制, 不支持事務,對消息的重復、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基于AMQP協議來實現。AMQP的主要特征是面向消息、 隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、 穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。RocketMQ是阿里開源的消息中間件,它是純Java開發,具有高吞吐量、高可用性、適合大規模分布式系統應用 的特點。RocketMQ思路起源于Kafka,但并不是Kafka的一個Copy,它對消息的可靠傳輸及事務性做了優化, 目前在阿里集團被廣泛應用于交易、充值、流計算、消息推送、日志流式處理、binglog分發等場景。在同步發送場景中,三個消息中間件的表現區分明顯:Kafka的吞吐量高達17.3w/s,不愧是高吞吐量消息中間件的行業老大。這主要取決于它的隊列模式保證了 寫磁盤的過程是線性IO。此時broker磁盤IO已達瓶頸。RocketMQ也表現不俗,吞吐量在11.6w/s,磁盤IO %util已接近100%。RocketMQ的消息寫入內存后即 返回ack,由單獨的線程專門做刷盤的操作,所有的消息均是順序寫文件。RabbitMQ的吞吐量5.95w/s,CPU資源消耗較高。它支持AMQP協議,實現非常重量級,為了保證消息的可靠性 在吞吐量上做了取舍。我們還做了RabbitMQ在消息持久化場景下的性能測試,吞吐量在2.6w/s左右。測試結論 在服務端處理同步發送的性能上,Kafka>RocketMQ>RabbitMQ。但是,作為經受過歷次雙十一洗禮的RocketMQ,在互聯網應用場景中更有它優越的一面。

rocketmq詳解

簡介 官方簡介:RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點: 1.能夠保證嚴格的消息順序 2.提供豐富的消息拉取模式 3.高效的訂閱者水平擴展能力 4.實時的消息訂閱機制 5.億級消息堆積能力

三、特性 1. nameserver 相對來說,nameserver的穩定性非常高。原因有二: 1 、nameserver互相獨立,彼此沒有通信關系,單臺nameserver掛掉,不影響其他nameserver,即使 全部掛掉,也不影響業務系統使用。無狀態 2 、nameserver不會有頻繁的讀寫,所以性能開銷非常小,穩定性很高。2. broker 與nameserver關系 連接單個broker和所有nameserver保持長連接 心跳心跳間隔:每隔30秒(此時間無法更改)向所有nameserver發送心跳,心跳包含了自身的topic配置信息。心跳超時:nameserver每隔10秒鐘(此時間無法更改),掃描所有還存活的broker連接,若某個連接2分鐘內(當前時間與最后更新時間差值超過2分鐘,此時間無法更改)沒有發送心跳數據,則斷開連接。斷開時機:broker掛掉;心跳超時導致nameserver主動關閉連接動作:一旦連接斷開,nameserver會立即感知,更新topc與隊列的對應關系,但不會通知生產者和消費者負載均衡一個topic分布在多個broker上,一個broker可以配置多個topic,它們是多對多的關系。 如果某個topic消息量很大,應該給它多配置幾個隊列,并且盡量多分布在不同broker上,減輕某個broker的壓力。topic消息量都比較均勻的情況下,如果某個broker上的隊列越多,則該broker壓力越大。可用性由于消息分布在各個broker上,一旦某個broker宕機,則該broker上的消息讀寫都會受到影響。所以rocketmq提供了master/slave的結構,salve定時從master同步數據,如果master宕機,則slave提供消費服務,但是不能寫入消息,此過程對應用透明,由rocketmq內部解決。這里有兩個關鍵點:1.一旦某個broker master宕機,生產者和消費者多久才能發現?受限于rocketmq的網絡連接機制,默認情況下,最多需要30秒,但這個時間可由應用設定參數來縮短時間。這個時間段內,發往該broker的消息都是失敗的,而且該broker的消息無法消費,因為此時消費者不知道該broker已經掛掉。2.消費者得到master宕機通知后,轉向slave消費(重定向,對于2次開發者透明),但是slave不能保證master的消息100%都同步過來了,因此會有少量的消息丟失。但是消息最終不會丟的,一旦master恢復,未同步過去的消息會被消費掉。可靠性1.所有發往broker的消息,有同步刷盤和異步刷盤機制,總的來說,可靠性非常高2.同步刷盤時,消息寫入物理文件才會返回成功,因此非常可靠3. 異步刷盤時,只有機器宕機,才會產生消息丟失,broker掛掉可能會發生,但是機器宕機崩潰是很少發生的,除非突然斷電消息清理 1.掃描間隔默認10秒,由broker配置參數cleanResourceInterval決定 2.空間閾值物理文件不能無限制的一直存儲在磁盤,當磁盤空間達到閾值時,不再接受消息,broker打印出日志,消息發送失敗,閾值為固定值85% 3.清理時機默認每天凌晨4點,由broker配置參數deleteWhen決定;或者磁盤空間達到閾值 4. 文件保留時長默認72小時,由broker配置參數fileReservedTime決定讀寫性能 1.文件內存映射方式操作文件,避免read/write系統調用和實時文件讀寫,性能非常高 2.永遠一個文件在寫,其他文件在讀 3.順序寫,隨機讀 4.將消息內容直接輸出到socket管道,避免系統調用系統特性 1.大內存,內存越大性能越高,否則系統swap會成為性能瓶頸 2.IO密集 3.cpu load高,使用率低,因為cpu占用后,大部分時間在IO WAIT 4.磁盤可靠性要求高,為了兼顧安全和性能,采用RAID10陣列 5.磁盤讀取速度要求快,要求高轉速大容量磁盤3. 消費者 與nameserver關系 1.連接單個消費者和一臺nameserver保持長連接,定時查詢topic配置信息,如果該nameserver掛掉,消費者會自動連接下一個nameserver,直到有可用連接為止,并能自動重連。 2.心跳與nameserver沒有心跳 3.輪詢時間消費者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker如果宕機,客戶端最多要30秒才能感知。該時間由DefaultMQPushConsumer的pollNameServerInteval參數決定,可手動配置。與broker關系 1.連接單個消費者和該消費者關聯的所有broker保持長連接。 2.心跳默認情況下,消費者每隔30秒向所有broker發送心跳,該時間由DefaultMQPushConsumer的heartbeatBrokerInterval參數決定,可手動配置。broker每隔10秒鐘(此時間無法更改),掃描所有還存活的連接,若某個連接2分鐘內(當前時間與最后更新時間差值超過2分鐘,此時間無法更改)沒有發送心跳數據,則關閉連接,并向該消費者分組的所有消費者發出通知,分組內消費者重新分配隊列繼續消費 3. 斷開時機:消費者掛掉;心跳超時導致broker主動關閉連接動作:一旦連接斷開,broker會立即感知到,并向該消費者分組的所有消費者發出通知,分組內消費者重新分配隊列繼續消費負載均衡集群消費模式下,一個消費者集群多臺機器共同消費一個topic的多個隊列,一個隊列只會被一個消費者消費。如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續消費。消費機制 1.本地隊列 消費者不間斷的從broker拉取消息,消息拉取到本地隊列,然后本地消費線程消費本地消息隊列,只是一個 異步過程,拉取線程不會等待本地消費線程,這種模式實時性非常高(本地消息隊列達到解耦的效果,響應 時間減少)。對消費者對本地隊列有一個保護,因此本地消息隊列不能無限大,否則可能會占用大量內存, 本地隊列大小由DefaultMQPushConsumer的pullThresholdForQueue屬性控制,默認1000,可手動設置。 2.輪詢間隔消息拉取線程每隔多久拉取一次?間隔時間由DefaultMQPushConsumer的pullInterval屬性控制,默認 為0,可手動設置。 3.消息消費數量 監聽器每次接受本地隊列的消息是多少條?這個參數由DefaultMQPushConsumer 的consumeMessageBatchMaxSize屬性控制,默認為1,可手動設置。消費進度存儲 每隔一段時間將各個隊列的消費進度存儲到對應的broker上,該時間由DefaultMQPushConsumer 的persistConsumerOffsetInterval屬性控制,默認為5秒,可手動設置。如果一個topic在某broker上有3個隊列,一個消費者消費這3個隊列,那么該消費者和這個broker有 幾個連接?一個連接,消費單位與隊列相關,消費連接只跟broker相關,事實上,消費者將所有隊列的消息拉取任務 放到本地的隊列,挨個拉取,拉取完畢后,又將拉取任務放到隊尾,然后執行下一個拉取任務4. 生產者與nameserver關系 1.連接 單個生產者者和一臺nameserver保持長連接,定時查詢topic配置信息,如果該nameserver掛掉,生產者會 自動連接下一個nameserver,直到有可用連接為止,并能自動重連。 2.輪詢時間 默認情況下,生產者每隔30秒從nameserver獲取所有topic的最新隊列情況,這意味著某個broker 如果宕機,生產者最多要30秒才能感知,在此期間,發往該broker的消息發送失敗。該時間 由DefaultMQProducer的pollNameServerInteval參數決定,可手動配置。 3.心跳 與nameserver沒有心跳與broker關系 1.連接 單個生產者和該生產者關聯的所有broker保持長連接。 2.心跳 默認情況下,生產者每隔30秒向所有broker發送心跳,該時間由DefaultMQProducer 的heartbeatBrokerInterval參數決定,可手動配置。broker每隔10秒鐘(此時間無法更改),掃描所有 還存活的連接,若某個連接2分鐘內(當前時間與最后更新時間差值超過2分鐘,此時間無法更改)沒有發送 心跳數據,則關閉連接。 3.連接斷開 移除broker上的生產者信息負載均衡生產者時間沒有關系,每個生產者向隊列輪流發送消息四、Broker集群配置方式及優缺點 ### 先啟動 NameServer,例如機器 IP 為:172.16.8.106:9876 nohup sh mqnamesrv & ### 在機器 A,啟動第一個 Master nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/ broker-a.properties & ### 在機器 B,啟動第二個 Master nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-noslave/ broker-b.properties &3. 多 Master 多 Slave 模式,異步復制每個 Master 配置一個 Slave,有多對Master-Slave,HA 采用異步復制方式,主備有短暫消息延遲, 毫秒級優點:即使磁盤損壞,消息丟失的非常少,且消息實時性不會受影響,因為 Master 宕機后,消費者 仍然可以從 Slave 消費,此過程對應用透明。不需要人工干預。性能同多 Master 模式幾乎一樣。缺點:Master 宕機,磁盤損壞情況,會丟失少量消息。### 先啟動 NameServer,例如機器 IP 為:172.16.8.106:9876 nohup sh mqnamesrv &### 在機器 A,啟動第一個 Master nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/ broker-a.properties &### 在機器 B,啟動第二個 Master nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/ broker-b.properties & ### 在機器 C,啟動第一個 Slave nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/ broker-a-s.properties &### 在機器 D,啟動第二個 Slave nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-async/ broker-b-s.properties &4. 多 Master 多 Slave 模式,同步雙寫每個 Master 配置一個 Slave,有多對Master-Slave,HA 采用同步雙寫方式,主備都寫成功, 向應用返回成功。優點:數據與服務都無單點,Master宕機情況下,消息無延遲,服務可用性與數據可用性都非常高缺點:性能比異步復制模式略低,大約低 10%左右,發送單個消息的 RT 會略高。目前主宕機后, 備機不能自動切換為主機,后續會支持自動切換功能。### 先啟動 NameServer,例如機器 IP 為:172.16.8.106:9876 nohup sh mqnamesrv &### 在機器 A,啟動第一個 Master nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/ broker-a.properties &### 在機器 B,啟動第二個 Master nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/ broker-b.properties &### 在機器 C,啟動第一個 Slave nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/ broker-a-s.properties &### 在機器 D,啟動第二個 Slave nohup sh mqbroker -n 172.16.8.106:9876 -c$ROCKETMQ_HOME/conf/2m-2s-sync/ broker-b-s.properties &

?

總結

以上是生活随笔為你收集整理的Rocketmq原理最佳实践的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。