Java消息中间件(activeMQ)
文章目錄
- **第一章 消息中間件概述**
- 1. 消息中間件的好處
- 2. 什么是消息中間件
- 3. 什么是JMS(規范)
- 4. 什么是AMQP(協議)
- 5. 幾個常用消息中間對比
- **第二章 初始JMS**
- **2.1 JSM相關概念**
- **2.2 隊列模式**
- **2.3 主題模式**
- **2.4 JSM編碼接口**
- **第三章 ActiveMQ的使用**
- 3.1 activeMQ在Windows平臺上的安裝
- 3.2 ActiveMQ的隊列模式
- 3.3 ActiveMQ的主題模式(發布/訂閱)
- **3.4 spring集成JMS連接ActiveMQ**
- **3.4.1 幾個相關類**
- **3.4.2 消息隊列模式與spring集成**
- 3.4.3 主題模式與spring的集成
- **第四章 ActiveMQ集群**
- **4.1 集群方式**
- **4.2 客戶端配置**
- 4.2.1. ActiveMQ失效轉移(failover):
- **4.3 Broker Cluster集群配置**
- **4.4 Master/Slave集群配置**
- **4.5 Broker clusters和Master Slave對比**
- **4.6 高可用且負載均衡的集群方案**
- 第五章 消息中間件如何傳對象
第一章 消息中間件概述
1. 消息中間件的好處
解耦、異步、橫向擴展、安全可靠、順序保證2. 什么是消息中間件
發送和接收數據,利用高效可靠的異步消息傳遞機制集成分布式系統3. 什么是JMS(規范)
Java消息服務(Java Message Service),是一個Java平臺中面向消息中間件的API4. 什么是AMQP(協議)
AMQP(advanced message queuing protocol),是一個提供統一消息服務的應用層標準協議。 此協議不受客戶端和中間件的不同產品和不同開發語言的限制。5. 幾個常用消息中間對比
| 優點 | 遵循JMS規范,安裝方便 | 繼承Erlang天生的并發性,最初用于金融行業,穩定性和安全性有保障 | 依賴zk,可動態擴展節點,高性能、高吞吐量、無限擴容、消息可指定追溯 |
| 缺點 | 有可能會丟失消息。現在的重心在下一代產品apolle上,所以5.x的產品不怎么維護了 | Erlang語言難度較大,不支持動態擴展 | 嚴格的順序機制,不支持消息優先級,不支持標準的消息協議,不利于平臺遷移 |
| 支持協議 | AMQP,OpenWire,Stomp,XMPP | AMQP | |
| 應用 | 適合中小企業,不適合好千個隊列的應用 | 適合對穩定性要求高的企業級應用 | 應用在大數據日志處理或對實時性、可靠性(少量數據丟失)要求較低的場景應用 |
第二章 初始JMS
2.1 JSM相關概念
2.2 隊列模式
1. 特性:
客戶端包括生產者和消費者
隊列中的消息只能被一個消息費者消息
消費者可以隨時消費隊列中的消息
2. 隊列模型示意圖
2.3 主題模式
1. 特性:
客戶端包括發布者和訂閱者
主題中的消息被所有訂閱者消息
消費者不能消費訂閱之前就發送到主題中的消息
2. 主題模型示意圖
2.4 JSM編碼接口
ConnectionFactory 用于創建連接到消息中間件的連接工廠
Connection 代表了應用程序和消息服務器之間的通信鏈路
Destination 指消息發布和接收的地點,包括隊列或主題
Session 表示一個單線程的上下文,用于發送和接收消息
MessageProducer 由會話創建,用于發送消息到目標
MessageConsumer 由會話創建,用于接收發送到目標的消息
Message 是在消費者和生產者之間傳送的對象, 消息頭,一組消息屬性,一個消息體
第三章 ActiveMQ的使用
3.1 activeMQ在Windows平臺上的安裝
1.下載ActiveMQ
去官方網站下載:http://activemq.apache.org/activemq-5152-release.html
2.運行ActiveMQ
解壓縮apache-activemq-5.5.1-bin.zip到C盤,然后雙擊C:\apache-activemq-5.15.2\bin\win64\activemq.bat運行ActiveMQ程序。
啟動ActiveMQ以后,登陸:http://localhost:8161/admin/,進入管理界面。
用戶名與密碼均為:admin
3.2 ActiveMQ的隊列模式
生產者代碼片:
package com.queue;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory; /*** 生產者* @author Peter**/ public class Proceducer {/*** */private final static String URL = "tcp://localhost:61616";/*** */private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標Destination dest = session.createQueue(QUEUE_NAME);// 6. 創建一個生產者MessageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 創建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 發布消息pro.send(msg);System.out.println(msg);}// 9. 關閉連接con.close();} }執行上面代碼后,在管理界面看到的結果是:
消費者代碼片:
/*** 消費者* @author Peter*/ public class Consumer {/*** 中間件地址*/private final static String URL = "tcp://localhost:61616";/*** 中間件隊列名,與生產者的一致*/private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標Destination dest = session.createQueue(QUEUE_NAME); // 6. 創建一個消費者MessageConsumer consumer = session.createConsumer(dest);// 7. 創建一個監聽器consumer.setMessageListener(new MessageListener() { @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息為:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不關閉,不然還沒接收到消息就關閉了//con.close();} }執行上面代碼后,在管理界面的結果如下:
如果我再新建一個消費者,我們會發現,兩個消費者在搶收消息,即一個消費者收到了消息,則另一個消費者就收不到該消息了。
3.3 ActiveMQ的主題模式(發布/訂閱)
由于訂閱者是收不到還未訂閱主題之前的內容的,所以必須要先啟動訂閱者。
訂閱者代碼片:
/*** 訂閱者* @author Peter**/ public class Consumer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標【與隊列模式的區別就在這里,相當于訂閱了該主題】Destination dest = session.createTopic(TOPIC_NAME); // 6. 創建一個消費者MessageConsumer consumer = session.createConsumer(dest);// 7. 創建一個監聽器consumer.setMessageListener(new MessageListener() { @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息為:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不關閉,不然還沒接收到消息就關閉了//con.close();} }發布者代碼片:
/*** 發布者* @author Peter**/ public class Proceducer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標【與隊列模式的區別就在這里,相當于發布一個主題】Destination dest = session.createTopic(TOPIC_NAME);// 6. 創建一個生產者MessageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 創建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 發布消息pro.send(msg);System.out.println(msg);}// 9. 關閉連接con.close();} }如果我們再新建一個訂閱者,我們會發現兩個訂閱者收到的消息完全一樣。
3.4 spring集成JMS連接ActiveMQ
我們下載的activeMQ壓縮文件里解壓后,能找到相關的jar包,但spring-jms這個可去maven倉庫下載
3.4.1 幾個相關類
1. ConnectionFactory 用于管理連接的連接工廠【也是連接池:管理JmsTemplate每次發送消息都會重新創建的連接、會話、productor】
實現類:
SingleConnectionFactory:每次都返回同一個連接
CachingConnectionFactory:繼承了SingleConnectionFactory,并實現了緩存
2.JmsTemplate 用于發送和接收消息的模板類
由spring提供,它是線程安全類,可以在整個應用范圍內應用
3.MessageListener 消息監聽器
只需實現一個只接收Message參數的onMesssage方法
3.4.2 消息隊列模式與spring集成
1. 發送消息的接口
public interface ProducerInter {public void sendMessage(String message); }2. 發送消息實現類
import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;public class ProducerImpl implements ProducerInter {@AutowiredJmsTemplate jms;// 由于可能會有多個目標,所以一定要以注入bean的id區分@Resource(name="destination")Destination destination;@Overridepublic void sendMessage(String message) {jms.send(destination, new MessageCreator() { @Overridepublic Message createMessage(Session sessioin) throws JMSException {TextMessage msg = sessioin.createTextMessage(message);System.out.println("發送消息:"+msg.getText());return msg;}});}}3. 配置文件(producer.xml)
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 開啟注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的連接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 創建一個點對點的隊列目標對象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactoryId"/></bean><!-- --><bean id="producerImpl" class="com.jms.spring.ProducerImpl"></bean> </beans>4. 測試發送
執行之后,進入管理界面可查看結果
5. 監聽消息類
public class ConsumerMessageListener implements MessageListener{// 監聽消息@Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("收到消息:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}}6. 接收消息的配置
<?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 開啟注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的連接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 創建一個點對點的隊列目標對象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><!-- 上面的配置與producer.xml里是一樣的 --><!-- 注入消息監聽器 --><bean id="consumerMessageListener" class="com.jms.spring.ConsumerMessageListener"></bean><!-- 配置消息監聽容器 --><bean id="jmsContainerListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactoryId"/><property name="destination" ref="destination"/><property name="messageListener" ref="consumerMessageListener"/></bean></beans>7. 測試消費者
public class TestConsumer {public static void main(String[] args) {new ClassPathXmlApplicationContext("consumer.xml"); } }3.4.3 主題模式與spring的集成
只需要將配置文件中的目標對象org.apache.activemq.command.ActiveMQQueue改成org.apache.activemq.command.ActiveMQTopic即可。需要注意的是,在主題模式下,一定要先啟動消費者。
第四章 ActiveMQ集群
4.1 集群方式
客戶端集群:讓多個消費者消費同一個隊列
Broker clusters:多個Broker之間同步消息
Master Slave(主從):實現高可用
4.2 客戶端配置
4.2.1. ActiveMQ失效轉移(failover):
定義:允許當其中一臺消息服務器宕機時,客戶端在傳輸層上重新連接到其它消息服務器
語法:failover:(uri1,uri2,…,uriN)?transportOptions
transportOptions參數說明
randomize 默認為true,表示在uri列表中選擇uri連接時,是否采用隨機策略
initialReconnectDelay 默認為10,單位毫秒,表示第一嘗試重連之間等待的時間
maxReconnectionDelay 默認30000,單位毫秒,最長重連的時間間隔
4.3 Broker Cluster集群配置
1. 原理:
2. NetworkConnector(網絡連接器)
網絡連接器主要用于配置ActiveMQ服務器與服務器之間的網絡通訊方式,用于服務器透傳消息
分為靜態連接器和動態連接器
3. 靜態連接器:適用連接地址不多的情況
<networkConnectors><networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"> </networkConnectors>4. 動態連接器
<networkConnectors><networkConnector uri="multicast://default"> </networkConnectors> <transportConnectors><transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"> </transportConnectors>4.4 Master/Slave集群配置
1. Master/Slave集群方案
Share nothing storage master/slave (5.8 以后的版本刪除了)
Share storage master/slave 共享存儲
Replicated LevelDB Store 基于可復制的LevelDB Store
2. 共享存儲集群的原理
先啟動A,A就因為排他鎖獨占資源成為Master,此時A有外部服務能力,而B沒有
如果A掛了,則B獲取資源成為Master,這時所有請求都會交給B
3. 基于復制的LevelDB Store的原理
因為是基于ZooKeeper的,所以至少需要3勸服務器。zk選舉A作為Master后,A就具有了外部服務能力,而B、C沒有。當A獲取到外部資源存儲后,會通過zk將資源同步到B和C。
如果A故障,則zk會重新選舉一個節點作為Master
4.5 Broker clusters和Master Slave對比
| Master/Slave | 是 | 否 |
| Broker Cluster | 否 | 是 |
4.6 高可用且負載均衡的集群方案
第五章 消息中間件如何傳對象
利用Json
總結
以上是生活随笔為你收集整理的Java消息中间件(activeMQ)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: solr管理界面详解
- 下一篇: Java 多态的简单介绍.