日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Java消息中间件(activeMQ)

發布時間:2025/3/20 java 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Java消息中间件(activeMQ) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

  • **第一章 消息中間件概述**
      • 1. 消息中間件的好處
      • 2. 什么是消息中間件
      • 3. 什么是JMS(規范)
      • 4. 什么是AMQP(協議)
      • 5. 幾個常用消息中間對比
  • **第二章 初始JMS**
      • **2.1 JSM相關概念**
      • **2.2 隊列模式**
      • **2.3 主題模式**
      • **2.4 JSM編碼接口**
  • **第三章 ActiveMQ的使用**
      • 3.1 activeMQ在Windows平臺上的安裝
      • 3.2 ActiveMQ的隊列模式
      • 3.3 ActiveMQ的主題模式(發布/訂閱)
      • **3.4 spring集成JMS連接ActiveMQ**
        • **3.4.1 幾個相關類**
        • **3.4.2 消息隊列模式與spring集成**
        • 3.4.3 主題模式與spring的集成
  • **第四章 ActiveMQ集群**
      • **4.1 集群方式**
      • **4.2 客戶端配置**
        • 4.2.1. ActiveMQ失效轉移(failover):
      • **4.3 Broker Cluster集群配置**
      • **4.4 Master/Slave集群配置**
      • **4.5 Broker clusters和Master Slave對比**
      • **4.6 高可用且負載均衡的集群方案**
  • 第五章 消息中間件如何傳對象

第一章 消息中間件概述

1. 消息中間件的好處

解耦、異步、橫向擴展、安全可靠、順序保證

2. 什么是消息中間件

發送和接收數據,利用高效可靠的異步消息傳遞機制集成分布式系統

3. 什么是JMS(規范)

Java消息服務(Java Message Service),是一個Java平臺中面向消息中間件的API

4. 什么是AMQP(協議)

AMQP(advanced message queuing protocol),是一個提供統一消息服務的應用層標準協議。 此協議不受客戶端和中間件的不同產品和不同開發語言的限制。

5. 幾個常用消息中間對比

.ActiveMQRabbitMQKafka
優點遵循JMS規范,安裝方便繼承Erlang天生的并發性,最初用于金融行業,穩定性和安全性有保障依賴zk,可動態擴展節點,高性能、高吞吐量、無限擴容、消息可指定追溯
缺點有可能會丟失消息。現在的重心在下一代產品apolle上,所以5.x的產品不怎么維護了Erlang語言難度較大,不支持動態擴展嚴格的順序機制,不支持消息優先級,不支持標準的消息協議,不利于平臺遷移
支持協議AMQP,OpenWire,Stomp,XMPPAMQP
應用適合中小企業,不適合好千個隊列的應用適合對穩定性要求高的企業級應用應用在大數據日志處理或對實時性、可靠性(少量數據丟失)要求較低的場景應用

第二章 初始JMS

2.1 JSM相關概念

  • 提供者: 實現JMS規范的消息中間件服務器
  • 客戶端:發送或接收消息的應用程序
  • 生產者/發布者: 創建并發送消息的客戶端
  • 消費者/訂閱者:接收并處理消息的客戶端
  • 消息:應用程序之間傳遞的數據內容
  • 消息模式:在客戶端之間傳遞消息的方式,JMS中定義了主題和隊列兩種模式
  • 2.2 隊列模式

    1. 特性
    客戶端包括生產者和消費者
    隊列中的消息只能被一個消息費者消息
    消費者可以隨時消費隊列中的消息

    2. 隊列模型示意圖

    2.3 主題模式

    1. 特性
    客戶端包括發布者和訂閱者
    主題中的消息被所有訂閱者消息
    消費者不能消費訂閱之前就發送到主題中的消息

    2. 主題模型示意圖

    2.4 JSM編碼接口

    ConnectionFactory 用于創建連接到消息中間件的連接工廠
    Connection 代表了應用程序和消息服務器之間的通信鏈路
    Destination 指消息發布和接收的地點,包括隊列或主題
    Session 表示一個單線程的上下文,用于發送和接收消息
    MessageProducer 由會話創建,用于發送消息到目標
    MessageConsumer 由會話創建,用于接收發送到目標的消息
    Message 是在消費者和生產者之間傳送的對象, 消息頭,一組消息屬性,一個消息體

    第三章 ActiveMQ的使用

    3.1 activeMQ在Windows平臺上的安裝

    1.下載ActiveMQ
    去官方網站下載:http://activemq.apache.org/activemq-5152-release.html

    2.運行ActiveMQ
    解壓縮apache-activemq-5.5.1-bin.zip到C盤,然后雙擊C:\apache-activemq-5.15.2\bin\win64\activemq.bat運行ActiveMQ程序。

    啟動ActiveMQ以后,登陸:http://localhost:8161/admin/,進入管理界面。
    用戶名與密碼均為:admin

    3.2 ActiveMQ的隊列模式

    生產者代碼片

    package com.queue;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory; /*** 生產者* @author Peter**/ public class Proceducer {/*** */private final static String URL = "tcp://localhost:61616";/*** */private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標Destination dest = session.createQueue(QUEUE_NAME);// 6. 創建一個生產者MessageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 創建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 發布消息pro.send(msg);System.out.println(msg);}// 9. 關閉連接con.close();} }

    執行上面代碼后,在管理界面看到的結果是:

    消費者代碼片

    /*** 消費者* @author Peter*/ public class Consumer {/*** 中間件地址*/private final static String URL = "tcp://localhost:61616";/*** 中間件隊列名,與生產者的一致*/private final static String QUEUE_NAME = "queue-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標Destination dest = session.createQueue(QUEUE_NAME); // 6. 創建一個消費者MessageConsumer consumer = session.createConsumer(dest);// 7. 創建一個監聽器consumer.setMessageListener(new MessageListener() { @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息為:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不關閉,不然還沒接收到消息就關閉了//con.close();} }

    執行上面代碼后,在管理界面的結果如下:

    如果我再新建一個消費者,我們會發現,兩個消費者在搶收消息,即一個消費者收到了消息,則另一個消費者就收不到該消息了。

    3.3 ActiveMQ的主題模式(發布/訂閱)

    由于訂閱者是收不到還未訂閱主題之前的內容的,所以必須要先啟動訂閱者。

    訂閱者代碼片:

    /*** 訂閱者* @author Peter**/ public class Consumer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標【與隊列模式的區別就在這里,相當于訂閱了該主題】Destination dest = session.createTopic(TOPIC_NAME); // 6. 創建一個消費者MessageConsumer consumer = session.createConsumer(dest);// 7. 創建一個監聽器consumer.setMessageListener(new MessageListener() { @Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("接收消息為:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}});// 先不關閉,不然還沒接收到消息就關閉了//con.close();} }

    發布者代碼片

    /*** 發布者* @author Peter**/ public class Proceducer {private final static String URL = "tcp://localhost:61616";private final static String TOPIC_NAME = "topic-name";public static void main(String[] args) throws JMSException {// 1. 創建ConnectionFactoryConnectionFactory factory = new ActiveMQConnectionFactory(URL);// 2. 創建ConnectionConnection con = factory.createConnection();// 3. 啟動連接con.start();// 4. 創建會話Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創建一個目標【與隊列模式的區別就在這里,相當于發布一個主題】Destination dest = session.createTopic(TOPIC_NAME);// 6. 創建一個生產者MessageProducer pro = (MessageProducer) session.createProducer(dest);for(int i = 0; i<10;i++) {// 7. 創建消息TextMessage msg = session.createTextMessage("消息"+i); // 8. 發布消息pro.send(msg);System.out.println(msg);}// 9. 關閉連接con.close();} }

    如果我們再新建一個訂閱者,我們會發現兩個訂閱者收到的消息完全一樣。

    3.4 spring集成JMS連接ActiveMQ

    我們下載的activeMQ壓縮文件里解壓后,能找到相關的jar包,但spring-jms這個可去maven倉庫下載

    3.4.1 幾個相關類

    1. ConnectionFactory 用于管理連接的連接工廠【也是連接池:管理JmsTemplate每次發送消息都會重新創建的連接、會話、productor】
    實現類:
    SingleConnectionFactory:每次都返回同一個連接
    CachingConnectionFactory:繼承了SingleConnectionFactory,并實現了緩存

    2.JmsTemplate 用于發送和接收消息的模板類
    由spring提供,它是線程安全類,可以在整個應用范圍內應用

    3.MessageListener 消息監聽器
    只需實現一個只接收Message參數的onMesssage方法

    3.4.2 消息隊列模式與spring集成

    1. 發送消息的接口

    public interface ProducerInter {public void sendMessage(String message); }

    2. 發送消息實現類

    import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage;import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator;public class ProducerImpl implements ProducerInter {@AutowiredJmsTemplate jms;// 由于可能會有多個目標,所以一定要以注入bean的id區分@Resource(name="destination")Destination destination;@Overridepublic void sendMessage(String message) {jms.send(destination, new MessageCreator() { @Overridepublic Message createMessage(Session sessioin) throws JMSException {TextMessage msg = sessioin.createTextMessage(message);System.out.println("發送消息:"+msg.getText());return msg;}});}}

    3. 配置文件(producer.xml)

    <?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 開啟注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的連接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 創建一個點對點的隊列目標對象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactoryId"/></bean><!-- --><bean id="producerImpl" class="com.jms.spring.ProducerImpl"></bean> </beans>

    4. 測試發送
    執行之后,進入管理界面可查看結果

    public class TestProducer {public static void main(String[] args) {// 從classpath下加載配置文件ApplicationContext applicationContext = new ClassPathXmlApplicationContext("producer.xml");ProducerImpl pro = (ProducerImpl) applicationContext.getBean("producerImpl");pro.sendMessage("hello world");} }

    5. 監聽消息類

    public class ConsumerMessageListener implements MessageListener{// 監聽消息@Overridepublic void onMessage(Message message) {TextMessage msg = (TextMessage) message;try {System.out.println("收到消息:"+msg.getText());} catch (JMSException e) {e.printStackTrace();}}}

    6. 接收消息的配置

    <?xml version="1.0" encoding="utf-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"xmlns:p="http://www.springframework.org/schema/p" xmlns:cache="http://www.springframework.org/schema/cache"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsdhttp://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"default-autowire="byName" default-lazy-init="false"><!-- 開啟注解 --><context:component-scan base-package="com.jms.spring"></context:component-scan><!-- ActiveMQ提供的 --><bean id="targetConnectionFactoryId" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://localhost:61616"/></bean><!-- spring提供的連接池 --><bean id="connectionFactoryId" class="org.springframework.jms.connection.SingleConnectionFactory"><property name="targetConnectionFactory" ref="targetConnectionFactoryId"/></bean><!-- 創建一個點對點的隊列目標對象 --><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queuename"/></bean><!-- 上面的配置與producer.xml里是一樣的 --><!-- 注入消息監聽器 --><bean id="consumerMessageListener" class="com.jms.spring.ConsumerMessageListener"></bean><!-- 配置消息監聽容器 --><bean id="jmsContainerListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactoryId"/><property name="destination" ref="destination"/><property name="messageListener" ref="consumerMessageListener"/></bean></beans>

    7. 測試消費者

    public class TestConsumer {public static void main(String[] args) {new ClassPathXmlApplicationContext("consumer.xml"); } }

    3.4.3 主題模式與spring的集成

    只需要將配置文件中的目標對象org.apache.activemq.command.ActiveMQQueue改成org.apache.activemq.command.ActiveMQTopic即可。需要注意的是,在主題模式下,一定要先啟動消費者。

    第四章 ActiveMQ集群

    4.1 集群方式

    客戶端集群:讓多個消費者消費同一個隊列
    Broker clusters:多個Broker之間同步消息
    Master Slave(主從):實現高可用

    4.2 客戶端配置

    4.2.1. ActiveMQ失效轉移(failover):

    定義:允許當其中一臺消息服務器宕機時,客戶端在傳輸層上重新連接到其它消息服務器
    語法:failover:(uri1,uri2,…,uriN)?transportOptions
    transportOptions參數說明
    randomize 默認為true,表示在uri列表中選擇uri連接時,是否采用隨機策略
    initialReconnectDelay 默認為10,單位毫秒,表示第一嘗試重連之間等待的時間
    maxReconnectionDelay 默認30000,單位毫秒,最長重連的時間間隔

    4.3 Broker Cluster集群配置

    1. 原理:

    2. NetworkConnector(網絡連接器)
    網絡連接器主要用于配置ActiveMQ服務器與服務器之間的網絡通訊方式,用于服務器透傳消息
    分為靜態連接器和動態連接器

    3. 靜態連接器:適用連接地址不多的情況

    <networkConnectors><networkConnector uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)"> </networkConnectors>

    4. 動態連接器

    <networkConnectors><networkConnector uri="multicast://default"> </networkConnectors> <transportConnectors><transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"> </transportConnectors>

    4.4 Master/Slave集群配置

    1. Master/Slave集群方案
    Share nothing storage master/slave (5.8 以后的版本刪除了)
    Share storage master/slave 共享存儲
    Replicated LevelDB Store 基于可復制的LevelDB Store

    2. 共享存儲集群的原理
    先啟動A,A就因為排他鎖獨占資源成為Master,此時A有外部服務能力,而B沒有

    如果A掛了,則B獲取資源成為Master,這時所有請求都會交給B

    3. 基于復制的LevelDB Store的原理
    因為是基于ZooKeeper的,所以至少需要3勸服務器。zk選舉A作為Master后,A就具有了外部服務能力,而B、C沒有。當A獲取到外部資源存儲后,會通過zk將資源同步到B和C。

    如果A故障,則zk會重新選舉一個節點作為Master

    4.5 Broker clusters和Master Slave對比

    .高可用負載均衡
    Master/Slave
    Broker Cluster

    4.6 高可用且負載均衡的集群方案

    第五章 消息中間件如何傳對象

    利用Json

    總結

    以上是生活随笔為你收集整理的Java消息中间件(activeMQ)的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。