ActiveMQ的几种集群配置
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ的几种集群配置
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
ActiveMQ是一款功能強(qiáng)大的消息服務(wù)器,它支持許多種開發(fā)語言,例如Java, C, C++, C#等等。企業(yè)級(jí)消息服務(wù)器無論對(duì)服務(wù)器穩(wěn)定性還是速度,要求都很高,而ActiveMQ的分布式集群則能很好的滿足這一需求,下面說說ActiveMQ的幾種集群配置。Queue consumer clusters此集群讓多個(gè)消費(fèi)者同時(shí)消費(fèi)一個(gè)隊(duì)列,若某個(gè)消費(fèi)者出問題無法消費(fèi)信息,則未消費(fèi)掉的消息將被發(fā)給其他正常的消費(fèi)者,結(jié)構(gòu)圖如下:
一、activeMQ主要的幾類部署方式比較1、默認(rèn)的單機(jī)部署(kahadb)activeMQ的默認(rèn)存儲(chǔ)的單機(jī)方式,以本地kahadb文件的方式存儲(chǔ),所以性能指標(biāo)完全依賴本地磁盤IO,不能提供高可用。2、基于zookeeper的主從(levelDB Master/Slave)5.9.0新推出的主從實(shí)現(xiàn),基于zookeeper來選舉出一個(gè)master,其他節(jié)點(diǎn)自動(dòng)作為slave實(shí)時(shí)同步消息。因?yàn)橛袑?shí)時(shí)同步數(shù)據(jù)的slave的存在,master不用擔(dān)心數(shù)據(jù)丟失,所以leveldb會(huì)優(yōu)先采用內(nèi)存存儲(chǔ)消息,異步同步到磁盤。所以該方式的activeMQ讀寫性能都最好,特別是寫性能能夠媲美非持久化消息。優(yōu)點(diǎn):實(shí)現(xiàn)高可用和數(shù)據(jù)安全性能較好缺點(diǎn):因?yàn)檫x舉機(jī)制要超過半數(shù),所以最少需要3臺(tái)節(jié)點(diǎn),才能實(shí)現(xiàn)高可用。3、基于共享數(shù)據(jù)庫的主從(Shared JDBC Master/Slave)可以基于postgres、mysql、oracle等常用數(shù)據(jù)庫。每個(gè)節(jié)點(diǎn)啟動(dòng)都會(huì)爭(zhēng)搶數(shù)據(jù)庫鎖,從而保證master的唯一性,其他節(jié)點(diǎn)作為備份,一直等待數(shù)據(jù)庫鎖的釋放。因?yàn)樗邢⒆x寫,其實(shí)都是數(shù)據(jù)庫操作,activeMQ節(jié)點(diǎn)本身壓力很小,性能完全取決于數(shù)據(jù)庫性能。優(yōu)點(diǎn):實(shí)現(xiàn)高可用和數(shù)據(jù)安全簡(jiǎn)單靈活,2臺(tái)節(jié)點(diǎn)就可以實(shí)現(xiàn)高可用缺點(diǎn):穩(wěn)定性依賴數(shù)據(jù)庫性能依賴數(shù)據(jù)庫
ActiveMQ 集群配置 高可用自從activemq5.9.0開始,activemq的集群實(shí)現(xiàn)方式取消了傳統(tǒng)的Pure Master Slave方式,增加了基于zookeeper+leveldb的實(shí)現(xiàn)方式,其他兩種方式:目錄共享和數(shù)據(jù)庫共享依然存在。1、Master-Slave部署方式 1)、Shared Filesystem Master-Slave方式 2)、Shared Database Master-Slave方式3)、Replicated LevelDB Store方式第一種方案同樣支持N個(gè)AMQ實(shí)例組網(wǎng),但由于他是基于kahadb存儲(chǔ)策略,亦可以部署在分布式文件系統(tǒng)上,應(yīng)用靈活、高效且安全。 第二種方案與shared filesystem方式類似,只是共享的存儲(chǔ)介質(zhì)由文件系統(tǒng)改成了數(shù)據(jù)庫而已,支持N個(gè)AMQ實(shí)例組網(wǎng),但他的性能會(huì)受限于數(shù)據(jù)庫; 第三種方案是ActiveMQ5.9以后才新增的特性,使用ZooKeeper協(xié)調(diào)選擇一個(gè)node作為master。被選擇的master broker node開啟并接受客戶端連接。 其他node轉(zhuǎn)入slave模式,連接master并同步他們的存儲(chǔ)狀態(tài)。其他node轉(zhuǎn)入slave模式,連接master并同步他們的存儲(chǔ)狀態(tài)。如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網(wǎng)絡(luò)中并連接master進(jìn)入slave mode。至于為什么是2-1,熟悉Zookeeper的應(yīng)該知道,有一個(gè)node要作為觀擦者存在。
Shared Filesystem Master-Slave方式 shared filesystem Master-Slave部署方式主要是通過共享存儲(chǔ)目錄來實(shí)現(xiàn)master和slave的熱備,所有的ActiveMQ應(yīng)用都在不斷地獲取共享目錄的控制權(quán),哪個(gè)應(yīng)用搶到了控制權(quán),它就成為master。 多個(gè)共享存儲(chǔ)目錄的應(yīng)用,誰先啟動(dòng),誰就可以最早取得共享目錄的控制權(quán)成為master,其他的應(yīng)用就只能作為slave。 一、下載activeMQ解壓并復(fù)制3份二、修改配置文件conf下面 activemq.xml如下 ,3份文件broker都修改為一致 其中dataDirectory 是 數(shù)據(jù)存儲(chǔ)共享目錄地址修改 kahaDB目錄地址為數(shù)據(jù)存儲(chǔ)共享目錄地址修改openwire 的tcp 連接地址端口分別為 61616 61626 61636 二 、修改conf 下面的 jetty.xml文件,修改管理界面端口 8161 8162 8163 方便測(cè)試查看啟動(dòng)的那一個(gè)activemq服務(wù)三、啟動(dòng) 成功了一個(gè)后面的將阻塞等待獲取鎖
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver {public static void main(String[] args) {// ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS Provider 的連接Connection connection = null;// Session: 一個(gè)發(fā)送或接收消息的線程Session session;// Destination :消息的目的地;消息發(fā)送給誰.Destination destination;// 消費(fèi)者,消息接收者M(jìn)essageConsumer consumer;connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61626,tcp://localhost:61636)");try {// 構(gòu)造從工廠得到連接對(duì)象connection = connectionFactory.createConnection();// 啟動(dòng)connection.start();// 獲取操作連接session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);// 獲取session注意參數(shù)值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在ActiveMq的console配置destination = session.createQueue("FirstQueue");consumer = session.createConsumer(destination);while (true) {//設(shè)置接收者接收消息的時(shí)間,為了便于測(cè)試,這里誰定為100sTextMessage message = (TextMessage) consumer.receive(100000);if (null != message) {System.out.println("收到消息" + message.getText());} else {break;}}} catch (Exception e) {e.printStackTrace();} finally {try {if (null != connection)connection.close();} catch (Throwable ignore) {}}}
}
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {private static final int SEND_NUMBER = 5;public static void main(String[] args) {// ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接ConnectionFactory connectionFactory;// Connection :JMS 客戶端到JMS Provider 的連接Connection connection = null;// Session: 一個(gè)發(fā)送或接收消息的線程Session session;// Destination :消息的目的地;消息發(fā)送給誰.Destination destination;// MessageProducer:消息發(fā)送者M(jìn)essageProducer producer;// TextMessage message;// 構(gòu)造ConnectionFactory實(shí)例對(duì)象,此處采用ActiveMq的實(shí)現(xiàn)jarconnectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:61626,tcp://localhost:61636)");try {// 構(gòu)造從工廠得到連接對(duì)象connection = connectionFactory.createConnection();// 啟動(dòng)connection.start();// 獲取操作連接session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);// 獲取session注意參數(shù)值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在ActiveMq的console配置destination = session.createQueue("FirstQueue");// 得到消息生成者【發(fā)送者】producer = session.createProducer(destination);// 設(shè)置不持久化,此處學(xué)習(xí),實(shí)際根據(jù)項(xiàng)目決定 ---- 集群此處必須持久化// producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 構(gòu)造消息,此處寫死,項(xiàng)目就是參數(shù),或者方法獲取sendMessage(session, producer);session.commit();} catch (Exception e) {e.printStackTrace();} finally {try {if (null != connection)connection.close();} catch (Throwable ignore) {}}}public static void sendMessage(Session session, MessageProducer producer)throws Exception {for (int i = 1; i <= SEND_NUMBER; i++) {TextMessage message = session.createTextMessage("ActiveMq 發(fā)送的消息" + i);// 發(fā)送消息到目的地方System.out.println("發(fā)送消息:" + "ActiveMq 發(fā)送的消息" + i);producer.send(message);}}
}
?
超強(qiáng)干貨來襲 云風(fēng)專訪:近40年碼齡,通宵達(dá)旦的技術(shù)人生總結(jié)
以上是生活随笔為你收集整理的ActiveMQ的几种集群配置的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java实现二分查找-两种方式
- 下一篇: 使用Github(仓库管理)