java 消息队列服务_ActiveMQ 消息队列服务
1?ActiveMQ簡介
1.1?ActiveMQ是什么
ActiveMQ是一個消息隊列應(yīng)用服務(wù)器(推送服務(wù)器)。支持JMS規(guī)范。
1.1.1?JMS概述
全稱:Java Message Service ,即為Java消息服務(wù),是一套java消息服務(wù)的API標(biāo)準(zhǔn)。(標(biāo)準(zhǔn)即接口)
實現(xiàn)了JMS標(biāo)準(zhǔn)的系統(tǒng),稱之為JMS Provider。
1.1.2?消息隊列
1.1.2.1?概念
消息隊列是在消息的傳輸過程中保存消息的容器,提供一種不同進程或者同一進程不同線程直接通訊的方式。
Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到Broker;
Broker:消息處理中心。負(fù)責(zé)消息存儲、確認(rèn)、重試等,一般其中會包含多個queue;
Consumer:消息消費者,負(fù)責(zé)從Broker中獲取消息,并進行相應(yīng)處理;
1.1.2.2?常見消息隊列應(yīng)用
(1)、ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實現(xiàn)。
(2)、RabbitMQ
RabbitMQ是一個在AMQP基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開源協(xié)議。開發(fā)語言為Erlang。
(3)、RocketMQ
由阿里巴巴定義開發(fā)的一套消息隊列應(yīng)用服務(wù)。
1.2?ActiveMQ能做什么
(1)實現(xiàn)兩個不同應(yīng)用(程序)之間的消息通訊。
(2)實現(xiàn)同一個應(yīng)用,不同模塊之間的消息通訊。(確保數(shù)據(jù)發(fā)送的穩(wěn)定性)
1.3?ActiveMQ下載
ActiveMQ下載地址:http://activemq.apache.org/download-archives.html
--可供下載的歷史版本
--說明:
ActiveMQ 5.10.x以上版本必須使用JDK1.8才能正常使用。
ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。
--根據(jù)操作系統(tǒng),選擇下載版本。(本教程下載Linux版本)
1.4?ActiveMQ主要特點
(1)支持多語言、多協(xié)議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應(yīng)用協(xié)議:OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2)對Spring的支持,ActiveMQ可以很容易整合到Spring的系統(tǒng)里面去。
(3)支持高可用、高性能的集群模式。
2?入門示例
2.1?需求
使用ActiveMQ實現(xiàn)消息隊列模型。
2.2?配置步驟說明
(1)搭建ActiveMQ消息服務(wù)器。
(2)創(chuàng)建一個java項目。
(3)創(chuàng)建消息生產(chǎn)者,發(fā)送消息。
(4)創(chuàng)建消息消費者,接收消息。
2.3?第一部分:搭建ActiveMQ消息服務(wù)器
2.3.1?第一步:下載、上傳至Linux
--說明:確保已經(jīng)安裝了jdk
2.3.2?第二步:安裝到/usr/local/activemq目錄
(1)解壓到/usr/local目錄下
[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz?-C /usr/local
(2)修改名稱為activemq
[root@node07192 ~]# cd /usr/local/
[root@node07192 local]# mv apache-activemq-5.9.0/ activemq
2.3.3?第三步:啟動ActiveMQ服務(wù)器
--說明:ActiveMQ是免安裝軟件,解壓即可啟動服務(wù)。
[root@node07192 local]# cd activemq/bin
[root@node07192 bin]# ./activemq start
--查看ActiveMQ啟動狀態(tài)
[root@node07192 bin]# ./activemq status
2.3.4?第四步:瀏覽器訪問ActiveMQ管理界面
2.3.4.1?Step1:查看ActiveMQ管理界面的服務(wù)端口。在/conf/jetty.xml中
--訪問管理控制臺的服務(wù)端口,默認(rèn)為:8161
[root@node07192 bin]# cd ../conf
[root@node07192 conf]# vim jetty.xml
2.3.4.2?Step2:查看ActiveMQ用戶、密碼。在/conf/users.properties中:
--默認(rèn)的用戶名、密碼均為amdin
[root@node07192 conf]# vim users.properties
2.3.4.3?Step3:訪問ActiveMQ管理控制臺。地址:http://ip:8161/
--注意:防火墻是沒有配置該服務(wù)的端口的。
因此,要訪問該服務(wù),必須在防火墻中配置。
(1)修改防火墻,開放8161端口
[root@node07192 conf]# vim /etc/sysconfig/iptables
(2)重啟防火墻
[root@node07192 conf]# service iptables restart
(3)登錄管理控制臺
--登陸,用戶名、密碼均為admin
--控制臺主界面
--搭建ActiveMQ服務(wù)器成功!!!
2.4?第二部分:創(chuàng)建java項目,導(dǎo)入jar包
--導(dǎo)包說明:
ActiveMQ的解壓包中,提供了運行ActiveMQ的所有jar。
--創(chuàng)建項目
2.5?第三部分:創(chuàng)建消息生成者,發(fā)送消息
--說明:ActiveMQ是實現(xiàn)了JMS規(guī)范的。在實現(xiàn)消息服務(wù)的時候,必須基于API接口規(guī)范。
2.5.1?JMS常用的API說明
下述API都是接口類型,定義在javax.jms包中,是JMS標(biāo)準(zhǔn)接口定義。ActiveMQ完全實現(xiàn)這一套api標(biāo)準(zhǔn)。
2.5.1.1?ConnectionFactory
鏈接工廠, 用于創(chuàng)建鏈接的工廠類型。
2.5.1.2?Connection
鏈接,用于建立訪問ActiveMQ連接的類型,由鏈接工廠創(chuàng)建。
2.5.1.3?Session
會話, 一次持久有效、有狀態(tài)的訪問,由鏈接創(chuàng)建。
2.5.1.4?Destination ?& ?Queue & Topic
目的地, 即本次訪問ActiveMQ消息隊列的地址,由Session會話創(chuàng)建。
(1)interfaceQueue?extends Destination
(2)Queue:隊列模型,只有一個消費者。消息一旦被消費,默認(rèn)刪除。
(3)Topic:主題訂閱中的消息,會發(fā)送給所有的消費者同時處理。
2.5.1.5?Message
消息,在消息傳遞過程中數(shù)據(jù)載體對象,是所有消息【文本消息TextMessage,對象消息ObjectMessage等】具體類型的頂級接口,可以通過會話創(chuàng)建或通過會話從ActiveMQ服務(wù)中獲取。
2.5.1.6?MessageProducer
消息生成者, 在一次有效會話中,用于發(fā)送消息給ActiveMQ服務(wù)的工具,由Session會話創(chuàng)建。
2.5.1.7?MessageCustomer
消息消費者【消息訂閱者,消息處理者】, 在一次有效會話中,用于ActiveMQ服務(wù)中獲取消息的工具,由Session會話創(chuàng)建。
我們定義的消息生產(chǎn)者和消費者,都是基于上面API實現(xiàn)的。
2.5.2?第一步:創(chuàng)建MyProducer類,定義sendMessage方法
package?cn.gzsxt.mq.producer;
import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageProducer;
import?javax.jms.Session;
import?org.apache.activemq.ActiveMQConnectionFactory;
public?class?MyProducer {
// 定義鏈接工廠
ConnectionFactory connectionFactory?= null;
// 定義鏈接
Connection connection?= null;
// 定義會話
Session session?= null;
// 定義目的地
Destination destination?= null;
// 定義消息生成者
MessageProducer producer?= null;
// 定義消息
Message message?= null;
public?void?sendToMQ(){
try{
/*
* 創(chuàng)建鏈接工廠
* ActiveMQConnectionFactory - 由ActiveMQ實現(xiàn)的ConnectionFactory接口實現(xiàn)類.
* 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* ?userName - 訪問ActiveMQ服務(wù)的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.
* ?password - 訪問ActiveMQ服務(wù)的密碼,密碼可以通過jetty-realm.properties配置文件配置.
* ?brokerURL - 訪問ActiveMQ服務(wù)的路徑地址.路徑結(jié)構(gòu)為-協(xié)議名://主機地址:端口號
* ?????此鏈接基于TCP/IP協(xié)議.
*/
connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創(chuàng)建鏈接對象
connection?= connectionFactory.createConnection();
// 啟動鏈接
connection.start();
/*
* 創(chuàng)建會話對象
* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);
* ?transacted - 是否使用事務(wù),可選值為true|false
* ?????true - 使用事務(wù),當(dāng)設(shè)置此變量值,則acknowledgeMode參數(shù)無效,建議傳遞的acknowledgeMode參數(shù)值為
* ?????????Session.SESSION_TRANSACTED
* ?????false - 不使用事務(wù),設(shè)置此變量值,則acknowledgeMode參數(shù)必須設(shè)置.
* ?acknowledgeMode - 消息確認(rèn)機制,可選值為:
* ?????Session.AUTO_ACKNOWLEDGE - 自動確認(rèn)消息機制
* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機制
* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機制
*/
session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應(yīng)的隊列
destination?= session.createQueue("test-mq");
// 創(chuàng)建消息生成者,創(chuàng)建的消息生成者與某目的地對應(yīng),即方法參數(shù)目的地.
producer?= session.createProducer(destination);
// 創(chuàng)建消息對象,創(chuàng)建一個文本消息,此消息對象中保存要傳遞的文本數(shù)據(jù).
message?= session.createTextMessage("hello,activeme");
// 發(fā)送消息
producer.send(message);
System.out.println("消息發(fā)送成功!");
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯誤!!");
}finally{
try?{
// 回收消息發(fā)送者資源
if(null?!= producer)
producer.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收會話資源
if(null?!= session)
session.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收鏈接資源
if(null?!= connection)
connection.close();
} catch?(JMSException e) {
e.printStackTrace();
}
}
}
}
2.5.3?第二步:創(chuàng)建一個測試類MessageTest
--添加junit類庫,快捷鍵ctrl+1
package?cn.gzsxt.mq.test;
import?org.junit.Test;
import?cn.gzsxt.mq.producer.MyProducer;
public?class?MessageTest {
@Test
public?void?sendToMQ(){
MyProducer producer?= new?MyProducer();
producer.sendToMQ();
}
}
2.5.4?第三步:測試
(1)設(shè)置防火墻,配置61616端口。注意修改之后重啟防火墻。
(2)測試結(jié)果:
--查看控制臺
--查看ActiveMQ管理控制界面
--消息發(fā)送成功!!!
2.6?第四部分:創(chuàng)建消息消費者,消費消息
2.6.1?第一步:創(chuàng)建MyConsumer類
package?cn.gzsxt.mq.consumer;
import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageConsumer;
import?javax.jms.Session;
import?javax.jms.TextMessage;
import?org.apache.activemq.ActiveMQConnectionFactory;
/**
* @ClassName:MyConsumer
* @Description: 消息消費者代碼
*/
public?class?MyConsumer {
// 定義鏈接工廠
ConnectionFactory connectionFactory?= null;
// 定義鏈接
Connection connection?= null;
// 定義會話
Session session?= null;
// 定義目的地
Destination destination?= null;
// 定義消息消費者
MessageConsumer consumer?= null;
// 定義消息
Message message?= null;
public?void?recieveFromMQ(){
try{
/*
* 創(chuàng)建鏈接工廠
* ActiveMQConnectionFactory - 由ActiveMQ實現(xiàn)的ConnectionFactory接口實現(xiàn)類.
* 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* ?userName - 訪問ActiveMQ服務(wù)的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.
* ?password - 訪問ActiveMQ服務(wù)的密碼,密碼可以通過jetty-realm.properties配置文件配置.
* ?brokerURL - 訪問ActiveMQ服務(wù)的路徑地址.路徑結(jié)構(gòu)為-協(xié)議名://主機地址:端口號
* ?????此鏈接基于TCP/IP協(xié)議.
*/
connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創(chuàng)建鏈接對象
connection?= connectionFactory.createConnection();
// 啟動鏈接
connection.start();
/*
* 創(chuàng)建會話對象
* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);
* ?transacted - 是否使用事務(wù),可選值為true|false
* ?????true - 使用事務(wù),當(dāng)設(shè)置此變量值,則acknowledgeMode參數(shù)無效,建議傳遞的acknowledgeMode參數(shù)值為
* ?????????Session.SESSION_TRANSACTED
* ?????false - 不使用事務(wù),設(shè)置此變量值,則acknowledgeMode參數(shù)必須設(shè)置.
* ?acknowledgeMode - 消息確認(rèn)機制,可選值為:
* ?????Session.AUTO_ACKNOWLEDGE - 自動確認(rèn)消息機制
* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機制
* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機制
*/
session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應(yīng)的隊列
destination?= session.createQueue("test-mq");
// 創(chuàng)建消息消費者,創(chuàng)建的消息消費者與某目的地對應(yīng),即方法參數(shù)目的地.
consumer?= session.createConsumer(destination);
// 從ActiveMQ服務(wù)中獲取消息
message?= consumer.receive();
TextMessage tMsg?= (TextMessage) message;
System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯誤!!");
}finally{
try?{
// 回收消息消費者資源
if(null?!= consumer)
consumer.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收會話資源
if(null?!= session)
session.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收鏈接資源
if(null?!= connection)
connection.close();
} catch?(JMSException e) {
e.printStackTrace();
}
}
}
}
2.6.2?第二步:修改測試類MessageTest,新增測試方法
@Test
public?void?recieveFromMQ(){
MyConsumer consumer?= new?MyConsumer();
consumer.recieveFromMQ();
}
2.6.3?第三步:測試
--查看Eclipse控制臺
--查看ActiveMQ管理控制界面
--消息被消費了,測試成功!!!
3?ActiveMQ監(jiān)聽器
問題:在前面的示例中,我們發(fā)現(xiàn)消費者每次只能消費一條消息。當(dāng)隊列中有多條消息的時候,我們需要多次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?我們希望一次將所有的消息全部接收。
答:使用ActiveMQ監(jiān)聽器來監(jiān)聽隊列,持續(xù)消費消息。
3.1?配置步驟說明
(1)創(chuàng)建一個監(jiān)聽器對象。
(2)修改消費者代碼,加載監(jiān)聽器。
3.2?配置步驟
3.2.1?第一步:創(chuàng)建監(jiān)聽器MyListener類
--說明:自定義監(jiān)聽器需要實現(xiàn)MessageListener接口
package?cn.gzsxt.mq.listener;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageListener;
import?javax.jms.TextMessage;
public?class?MyListener implements?MessageListener{
@Override
public?void?onMessage(Message message) {
if(null!=message){
TextMessage tMsg?= (TextMessage) message;
try?{
System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
} catch?(JMSException e) {
e.printStackTrace();
}
}
}
}
3.2.2?第二步:修改MyConsumer代碼,加載監(jiān)聽器
--說明:監(jiān)聽器需要持續(xù)加載,因此消費程序不能結(jié)束。
這里我們使用輸入流阻塞消費線程結(jié)束。(實際開發(fā)中,使用web項目加載)
package?cn.gzsxt.mq.consumer;
import?javax.jms.Connection;
import?javax.jms.ConnectionFactory;
import?javax.jms.Destination;
import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageConsumer;
import?javax.jms.Session;
import?javax.jms.TextMessage;
import?org.apache.activemq.ActiveMQConnectionFactory;
import?cn.gzsxt.mq.listener.MyListener;
/**
* @ClassName:MyConsumer
* @Description: 消息消費者代碼
*/
public?class?MyConsumer {
// 定義鏈接工廠
ConnectionFactory connectionFactory?= null;
// 定義鏈接
Connection connection?= null;
// 定義會話
Session session?= null;
// 定義目的地
Destination destination?= null;
// 定義消息消費者
MessageConsumer consumer?= null;
// 定義消息
Message message?= null;
public?Message recieveFromMQ(){
try{
/*
* 創(chuàng)建鏈接工廠
* ActiveMQConnectionFactory - 由ActiveMQ實現(xiàn)的ConnectionFactory接口實現(xiàn)類.
* 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
* ?userName - 訪問ActiveMQ服務(wù)的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.
* ?password - 訪問ActiveMQ服務(wù)的密碼,密碼可以通過jetty-realm.properties配置文件配置.
* ?brokerURL - 訪問ActiveMQ服務(wù)的路徑地址.路徑結(jié)構(gòu)為-協(xié)議名://主機地址:端口號
* ?????此鏈接基于TCP/IP協(xié)議.
*/
connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創(chuàng)建鏈接對象
connection?= connectionFactory.createConnection();
// 啟動鏈接
connection.start();
/*
* 創(chuàng)建會話對象
* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);
* ?transacted - 是否使用事務(wù),可選值為true|false
* ?????true - 使用事務(wù),當(dāng)設(shè)置此變量值,則acknowledgeMode參數(shù)無效,建議傳遞的acknowledgeMode參數(shù)值為
* ?????????Session.SESSION_TRANSACTED
* ?????false - 不使用事務(wù),設(shè)置此變量值,則acknowledgeMode參數(shù)必須設(shè)置.
* ?acknowledgeMode - 消息確認(rèn)機制,可選值為:
* ?????Session.AUTO_ACKNOWLEDGE - 自動確認(rèn)消息機制
* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機制
* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機制
*/
session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應(yīng)的隊列
destination?= session.createQueue("test-mq");
// 創(chuàng)建消息消費者,創(chuàng)建的消息消費者與某目的地對應(yīng),即方法參數(shù)目的地.
consumer?= session.createConsumer(destination);
// // 從ActiveMQ服務(wù)中獲取消息
// message = consumer.receive();
//
// TextMessage tMsg = (TextMessage) message;
//
// System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
//加載監(jiān)聽器
consumer.setMessageListener(newMyListener());
//監(jiān)聽器需要持續(xù)加載,這里我們使用輸入流阻塞當(dāng)前線程結(jié)束。
System.in.read();
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯誤!!");
}finally{
try?{
// 回收消息消費者資源
if(null?!= consumer)
consumer.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收會話資源
if(null?!= session)
session.close();
} catch?(JMSException e) {
e.printStackTrace();
}
try?{
// 回收鏈接資源
if(null?!= connection)
connection.close();
} catch?(JMSException e) {
e.printStackTrace();
}
}
return?message;
}
}
3.3?測試
(1)多次運行生產(chǎn)者,發(fā)送多條消息到隊列中。
(2)運行消費者。觀察結(jié)果
--查看Eclipse控制臺,一次消費了3條消息
--查看ActiveMQ管理控制界面,所有消息都被消費了!
--測試成功!!!
4?ActiveMQ消息服務(wù)模式
問題:在入門示例中,只能向一個消費者發(fā)送消息。但是有一些場景,需求有多個消費者都能接收到消息,比如:美團APP每天的消息推送。該如何實現(xiàn)呢?
答:ActiveMQ是通過不同的服務(wù)模式來解決這個問題的。
所以,要搞清楚這個問題,必須知道ActiveMQ有哪些應(yīng)用模式。
4.1?PTP模式(point to point)
--消息模型
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費者從queue中取出并且消費消息。
消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經(jīng)被消費的消息。
Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費、其它的則不能消費此消息了。
當(dāng)消費者不存在時,消息會一直保存,直到有消費消費
我們的入門示例,就是采用的這種PTP服務(wù)模式。
4.2?TOPIC(主題訂閱模式)
--消息模型
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時有多個消息消費者(訂閱)消費該消息。
和點對點方式不同,發(fā)布到topic的消息會被所有訂閱者消費。
當(dāng)生產(chǎn)者發(fā)布消息,不管是否有消費者。都不會保存消息
所以,主題訂閱模式下,一定要先有消息的消費者(訂閱者),后有消息的生產(chǎn)者(發(fā)布者)。
我們前面已經(jīng)實現(xiàn)了PTP模式,下面我們來實現(xiàn)TOPIC模式。
5?Topic模式實現(xiàn)
5.1?配置步驟說明
發(fā)表于 2019-08-01 00:00
閱讀 ( 411 )
總結(jié)
以上是生活随笔為你收集整理的java 消息队列服务_ActiveMQ 消息队列服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java int 传引用吗_Java的参
- 下一篇: java 内存 开发 经验_有一到五年开