日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 消息队列服务_ActiveMQ 消息队列服务

發(fā)布時間:2023/12/2 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 消息队列服务_ActiveMQ 消息队列服务 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1?ActiveMQ簡介

1.1?ActiveMQ是什么

ActiveMQ是一個消息隊列應(yīng)用服務(wù)器(推送服務(wù)器)。支持JMS規(guī)范。

1.1.1?JMS概述

全稱:Java Message Service ,即為Java消息服務(wù),是一套java消息服務(wù)的API標(biāo)準(zhǔn)。(標(biāo)準(zhǔn)即接口)

實現(xiàn)了JMS標(biāo)準(zhǔn)的系統(tǒng),稱之為JMS Provider。

1.1.2?消息隊列

1.1.2.1?概念

消息隊列是在消息的傳輸過程中保存消息的容器,提供一種不同進程或者同一進程不同線程直接通訊的方式。

Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到Broker;

Broker:消息處理中心。負(fù)責(zé)消息存儲、確認(rèn)、重試等,一般其中會包含多個queue;

Consumer:消息消費者,負(fù)責(zé)從Broker中獲取消息,并進行相應(yīng)處理;

1.1.2.2?常見消息隊列應(yīng)用

(1)、ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實現(xiàn)。

(2)、RabbitMQ

RabbitMQ是一個在AMQP基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開源協(xié)議。開發(fā)語言為Erlang。

(3)、RocketMQ

由阿里巴巴定義開發(fā)的一套消息隊列應(yīng)用服務(wù)。

1.2?ActiveMQ能做什么

(1)實現(xiàn)兩個不同應(yīng)用(程序)之間的消息通訊。

(2)實現(xiàn)同一個應(yīng)用,不同模塊之間的消息通訊。(確保數(shù)據(jù)發(fā)送的穩(wěn)定性)

1.3?ActiveMQ下載

ActiveMQ下載地址:http://activemq.apache.org/download-archives.html

--可供下載的歷史版本

--說明:

ActiveMQ 5.10.x以上版本必須使用JDK1.8才能正常使用。

ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。

--根據(jù)操作系統(tǒng),選擇下載版本。(本教程下載Linux版本)

1.4?ActiveMQ主要特點

(1)支持多語言、多協(xié)議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應(yīng)用協(xié)議:OpenWire,Stomp REST,WS Notification,XMPP,AMQP

(2)對Spring的支持,ActiveMQ可以很容易整合到Spring的系統(tǒng)里面去。

(3)支持高可用、高性能的集群模式。

2?入門示例

2.1?需求

使用ActiveMQ實現(xiàn)消息隊列模型。

2.2?配置步驟說明

(1)搭建ActiveMQ消息服務(wù)器。

(2)創(chuàng)建一個java項目。

(3)創(chuàng)建消息生產(chǎn)者,發(fā)送消息。

(4)創(chuàng)建消息消費者,接收消息。

2.3?第一部分:搭建ActiveMQ消息服務(wù)器

2.3.1?第一步:下載、上傳至Linux

--說明:確保已經(jīng)安裝了jdk

2.3.2?第二步:安裝到/usr/local/activemq目錄

(1)解壓到/usr/local目錄下

[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz?-C /usr/local

(2)修改名稱為activemq

[root@node07192 ~]# cd /usr/local/

[root@node07192 local]# mv apache-activemq-5.9.0/ activemq

2.3.3?第三步:啟動ActiveMQ服務(wù)器

--說明:ActiveMQ是免安裝軟件,解壓即可啟動服務(wù)。

[root@node07192 local]# cd activemq/bin

[root@node07192 bin]# ./activemq start

--查看ActiveMQ啟動狀態(tài)

[root@node07192 bin]# ./activemq status

2.3.4?第四步:瀏覽器訪問ActiveMQ管理界面

2.3.4.1?Step1:查看ActiveMQ管理界面的服務(wù)端口。在/conf/jetty.xml中

--訪問管理控制臺的服務(wù)端口,默認(rèn)為:8161

[root@node07192 bin]# cd ../conf

[root@node07192 conf]# vim jetty.xml

2.3.4.2?Step2:查看ActiveMQ用戶、密碼。在/conf/users.properties中:

--默認(rèn)的用戶名、密碼均為amdin

[root@node07192 conf]# vim users.properties

2.3.4.3?Step3:訪問ActiveMQ管理控制臺。地址:http://ip:8161/

--注意:防火墻是沒有配置該服務(wù)的端口的。

因此,要訪問該服務(wù),必須在防火墻中配置。

(1)修改防火墻,開放8161端口

[root@node07192 conf]# vim /etc/sysconfig/iptables

(2)重啟防火墻

[root@node07192 conf]# service iptables restart

(3)登錄管理控制臺

--登陸,用戶名、密碼均為admin

--控制臺主界面

--搭建ActiveMQ服務(wù)器成功!!!

2.4?第二部分:創(chuàng)建java項目,導(dǎo)入jar包

--導(dǎo)包說明:

ActiveMQ的解壓包中,提供了運行ActiveMQ的所有jar。

--創(chuàng)建項目

2.5?第三部分:創(chuàng)建消息生成者,發(fā)送消息

--說明:ActiveMQ是實現(xiàn)了JMS規(guī)范的。在實現(xiàn)消息服務(wù)的時候,必須基于API接口規(guī)范。

2.5.1?JMS常用的API說明

下述API都是接口類型,定義在javax.jms包中,是JMS標(biāo)準(zhǔn)接口定義。ActiveMQ完全實現(xiàn)這一套api標(biāo)準(zhǔn)。

2.5.1.1?ConnectionFactory

鏈接工廠, 用于創(chuàng)建鏈接的工廠類型。

2.5.1.2?Connection

鏈接,用于建立訪問ActiveMQ連接的類型,由鏈接工廠創(chuàng)建。

2.5.1.3?Session

會話, 一次持久有效、有狀態(tài)的訪問,由鏈接創(chuàng)建。

2.5.1.4?Destination ?& ?Queue & Topic

目的地, 即本次訪問ActiveMQ消息隊列的地址,由Session會話創(chuàng)建。

(1)interfaceQueue?extends Destination

(2)Queue:隊列模型,只有一個消費者。消息一旦被消費,默認(rèn)刪除。

(3)Topic:主題訂閱中的消息,會發(fā)送給所有的消費者同時處理。

2.5.1.5?Message

消息,在消息傳遞過程中數(shù)據(jù)載體對象,是所有消息【文本消息TextMessage,對象消息ObjectMessage等】具體類型的頂級接口,可以通過會話創(chuàng)建或通過會話從ActiveMQ服務(wù)中獲取。

2.5.1.6?MessageProducer

消息生成者, 在一次有效會話中,用于發(fā)送消息給ActiveMQ服務(wù)的工具,由Session會話創(chuàng)建。

2.5.1.7?MessageCustomer

消息消費者【消息訂閱者,消息處理者】, 在一次有效會話中,用于ActiveMQ服務(wù)中獲取消息的工具,由Session會話創(chuàng)建。

我們定義的消息生產(chǎn)者和消費者,都是基于上面API實現(xiàn)的。

2.5.2?第一步:創(chuàng)建MyProducer類,定義sendMessage方法

package?cn.gzsxt.mq.producer;

import?javax.jms.Connection;

import?javax.jms.ConnectionFactory;

import?javax.jms.Destination;

import?javax.jms.JMSException;

import?javax.jms.Message;

import?javax.jms.MessageProducer;

import?javax.jms.Session;

import?org.apache.activemq.ActiveMQConnectionFactory;

public?class?MyProducer {

// 定義鏈接工廠

ConnectionFactory connectionFactory?= null;

// 定義鏈接

Connection connection?= null;

// 定義會話

Session session?= null;

// 定義目的地

Destination destination?= null;

// 定義消息生成者

MessageProducer producer?= null;

// 定義消息

Message message?= null;

public?void?sendToMQ(){

try{

/*

* 創(chuàng)建鏈接工廠

* ActiveMQConnectionFactory - 由ActiveMQ實現(xiàn)的ConnectionFactory接口實現(xiàn)類.

* 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

* ?userName - 訪問ActiveMQ服務(wù)的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.

* ?password - 訪問ActiveMQ服務(wù)的密碼,密碼可以通過jetty-realm.properties配置文件配置.

* ?brokerURL - 訪問ActiveMQ服務(wù)的路徑地址.路徑結(jié)構(gòu)為-協(xié)議名://主機地址:端口號

* ?????此鏈接基于TCP/IP協(xié)議.

*/

connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

// 創(chuàng)建鏈接對象

connection?= connectionFactory.createConnection();

// 啟動鏈接

connection.start();

/*

* 創(chuàng)建會話對象

* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);

* ?transacted - 是否使用事務(wù),可選值為true|false

* ?????true - 使用事務(wù),當(dāng)設(shè)置此變量值,則acknowledgeMode參數(shù)無效,建議傳遞的acknowledgeMode參數(shù)值為

* ?????????Session.SESSION_TRANSACTED

* ?????false - 不使用事務(wù),設(shè)置此變量值,則acknowledgeMode參數(shù)必須設(shè)置.

* ?acknowledgeMode - 消息確認(rèn)機制,可選值為:

* ?????Session.AUTO_ACKNOWLEDGE - 自動確認(rèn)消息機制

* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機制

* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機制

*/

session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應(yīng)的隊列

destination?= session.createQueue("test-mq");

// 創(chuàng)建消息生成者,創(chuàng)建的消息生成者與某目的地對應(yīng),即方法參數(shù)目的地.

producer?= session.createProducer(destination);

// 創(chuàng)建消息對象,創(chuàng)建一個文本消息,此消息對象中保存要傳遞的文本數(shù)據(jù).

message?= session.createTextMessage("hello,activeme");

// 發(fā)送消息

producer.send(message);

System.out.println("消息發(fā)送成功!");

}catch(Exception e){

e.printStackTrace();

System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯誤!!");

}finally{

try?{

// 回收消息發(fā)送者資源

if(null?!= producer)

producer.close();

} catch?(JMSException e) {

e.printStackTrace();

}

try?{

// 回收會話資源

if(null?!= session)

session.close();

} catch?(JMSException e) {

e.printStackTrace();

}

try?{

// 回收鏈接資源

if(null?!= connection)

connection.close();

} catch?(JMSException e) {

e.printStackTrace();

}

}

}

}

2.5.3?第二步:創(chuàng)建一個測試類MessageTest

--添加junit類庫,快捷鍵ctrl+1

package?cn.gzsxt.mq.test;

import?org.junit.Test;

import?cn.gzsxt.mq.producer.MyProducer;

public?class?MessageTest {

@Test

public?void?sendToMQ(){

MyProducer producer?= new?MyProducer();

producer.sendToMQ();

}

}

2.5.4?第三步:測試

(1)設(shè)置防火墻,配置61616端口。注意修改之后重啟防火墻。

(2)測試結(jié)果:

--查看控制臺

--查看ActiveMQ管理控制界面

--消息發(fā)送成功!!!

2.6?第四部分:創(chuàng)建消息消費者,消費消息

2.6.1?第一步:創(chuàng)建MyConsumer類

package?cn.gzsxt.mq.consumer;

import?javax.jms.Connection;

import?javax.jms.ConnectionFactory;

import?javax.jms.Destination;

import?javax.jms.JMSException;

import?javax.jms.Message;

import?javax.jms.MessageConsumer;

import?javax.jms.Session;

import?javax.jms.TextMessage;

import?org.apache.activemq.ActiveMQConnectionFactory;

/**

* @ClassName:MyConsumer

* @Description: 消息消費者代碼

*/

public?class?MyConsumer {

// 定義鏈接工廠

ConnectionFactory connectionFactory?= null;

// 定義鏈接

Connection connection?= null;

// 定義會話

Session session?= null;

// 定義目的地

Destination destination?= null;

// 定義消息消費者

MessageConsumer consumer?= null;

// 定義消息

Message message?= null;

public?void?recieveFromMQ(){

try{

/*

* 創(chuàng)建鏈接工廠

* ActiveMQConnectionFactory - 由ActiveMQ實現(xiàn)的ConnectionFactory接口實現(xiàn)類.

* 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

* ?userName - 訪問ActiveMQ服務(wù)的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.

* ?password - 訪問ActiveMQ服務(wù)的密碼,密碼可以通過jetty-realm.properties配置文件配置.

* ?brokerURL - 訪問ActiveMQ服務(wù)的路徑地址.路徑結(jié)構(gòu)為-協(xié)議名://主機地址:端口號

* ?????此鏈接基于TCP/IP協(xié)議.

*/

connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

// 創(chuàng)建鏈接對象

connection?= connectionFactory.createConnection();

// 啟動鏈接

connection.start();

/*

* 創(chuàng)建會話對象

* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);

* ?transacted - 是否使用事務(wù),可選值為true|false

* ?????true - 使用事務(wù),當(dāng)設(shè)置此變量值,則acknowledgeMode參數(shù)無效,建議傳遞的acknowledgeMode參數(shù)值為

* ?????????Session.SESSION_TRANSACTED

* ?????false - 不使用事務(wù),設(shè)置此變量值,則acknowledgeMode參數(shù)必須設(shè)置.

* ?acknowledgeMode - 消息確認(rèn)機制,可選值為:

* ?????Session.AUTO_ACKNOWLEDGE - 自動確認(rèn)消息機制

* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機制

* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機制

*/

session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應(yīng)的隊列

destination?= session.createQueue("test-mq");

// 創(chuàng)建消息消費者,創(chuàng)建的消息消費者與某目的地對應(yīng),即方法參數(shù)目的地.

consumer?= session.createConsumer(destination);

// 從ActiveMQ服務(wù)中獲取消息

message?= consumer.receive();

TextMessage tMsg?= (TextMessage) message;

System.out.println("從MQ中獲取的消息是:"+tMsg.getText());

}catch(Exception e){

e.printStackTrace();

System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯誤!!");

}finally{

try?{

// 回收消息消費者資源

if(null?!= consumer)

consumer.close();

} catch?(JMSException e) {

e.printStackTrace();

}

try?{

// 回收會話資源

if(null?!= session)

session.close();

} catch?(JMSException e) {

e.printStackTrace();

}

try?{

// 回收鏈接資源

if(null?!= connection)

connection.close();

} catch?(JMSException e) {

e.printStackTrace();

}

}

}

}

2.6.2?第二步:修改測試類MessageTest,新增測試方法

@Test

public?void?recieveFromMQ(){

MyConsumer consumer?= new?MyConsumer();

consumer.recieveFromMQ();

}

2.6.3?第三步:測試

--查看Eclipse控制臺

--查看ActiveMQ管理控制界面

--消息被消費了,測試成功!!!

3?ActiveMQ監(jiān)聽器

問題:在前面的示例中,我們發(fā)現(xiàn)消費者每次只能消費一條消息。當(dāng)隊列中有多條消息的時候,我們需要多次運行消費者,才能消費完這些消息。很麻煩!!!!如何解決這個問題呢?我們希望一次將所有的消息全部接收。

答:使用ActiveMQ監(jiān)聽器來監(jiān)聽隊列,持續(xù)消費消息。

3.1?配置步驟說明

(1)創(chuàng)建一個監(jiān)聽器對象。

(2)修改消費者代碼,加載監(jiān)聽器。

3.2?配置步驟

3.2.1?第一步:創(chuàng)建監(jiān)聽器MyListener類

--說明:自定義監(jiān)聽器需要實現(xiàn)MessageListener接口

package?cn.gzsxt.mq.listener;

import?javax.jms.JMSException;

import?javax.jms.Message;

import?javax.jms.MessageListener;

import?javax.jms.TextMessage;

public?class?MyListener implements?MessageListener{

@Override

public?void?onMessage(Message message) {

if(null!=message){

TextMessage tMsg?= (TextMessage) message;

try?{

System.out.println("從MQ中獲取的消息是:"+tMsg.getText());

} catch?(JMSException e) {

e.printStackTrace();

}

}

}

}

3.2.2?第二步:修改MyConsumer代碼,加載監(jiān)聽器

--說明:監(jiān)聽器需要持續(xù)加載,因此消費程序不能結(jié)束。

這里我們使用輸入流阻塞消費線程結(jié)束。(實際開發(fā)中,使用web項目加載)

package?cn.gzsxt.mq.consumer;

import?javax.jms.Connection;

import?javax.jms.ConnectionFactory;

import?javax.jms.Destination;

import?javax.jms.JMSException;

import?javax.jms.Message;

import?javax.jms.MessageConsumer;

import?javax.jms.Session;

import?javax.jms.TextMessage;

import?org.apache.activemq.ActiveMQConnectionFactory;

import?cn.gzsxt.mq.listener.MyListener;

/**

* @ClassName:MyConsumer

* @Description: 消息消費者代碼

*/

public?class?MyConsumer {

// 定義鏈接工廠

ConnectionFactory connectionFactory?= null;

// 定義鏈接

Connection connection?= null;

// 定義會話

Session session?= null;

// 定義目的地

Destination destination?= null;

// 定義消息消費者

MessageConsumer consumer?= null;

// 定義消息

Message message?= null;

public?Message recieveFromMQ(){

try{

/*

* 創(chuàng)建鏈接工廠

* ActiveMQConnectionFactory - 由ActiveMQ實現(xiàn)的ConnectionFactory接口實現(xiàn)類.

* 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

* ?userName - 訪問ActiveMQ服務(wù)的用戶名,用戶名可以通過jetty-realm.properties配置文件配置.

* ?password - 訪問ActiveMQ服務(wù)的密碼,密碼可以通過jetty-realm.properties配置文件配置.

* ?brokerURL - 訪問ActiveMQ服務(wù)的路徑地址.路徑結(jié)構(gòu)為-協(xié)議名://主機地址:端口號

* ?????此鏈接基于TCP/IP協(xié)議.

*/

connectionFactory?= new?ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

// 創(chuàng)建鏈接對象

connection?= connectionFactory.createConnection();

// 啟動鏈接

connection.start();

/*

* 創(chuàng)建會話對象

* 方法- connection.createSession(boolean transacted,int?acknowledgeMode);

* ?transacted - 是否使用事務(wù),可選值為true|false

* ?????true - 使用事務(wù),當(dāng)設(shè)置此變量值,則acknowledgeMode參數(shù)無效,建議傳遞的acknowledgeMode參數(shù)值為

* ?????????Session.SESSION_TRANSACTED

* ?????false - 不使用事務(wù),設(shè)置此變量值,則acknowledgeMode參數(shù)必須設(shè)置.

* ?acknowledgeMode - 消息確認(rèn)機制,可選值為:

* ?????Session.AUTO_ACKNOWLEDGE - 自動確認(rèn)消息機制

* ?????Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機制

* ?????Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機制

*/

session?= connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建目的地,目的地命名即隊列命名,消息消費者需要通過此命名訪問對應(yīng)的隊列

destination?= session.createQueue("test-mq");

// 創(chuàng)建消息消費者,創(chuàng)建的消息消費者與某目的地對應(yīng),即方法參數(shù)目的地.

consumer?= session.createConsumer(destination);

// // 從ActiveMQ服務(wù)中獲取消息

// message = consumer.receive();

//

// TextMessage tMsg = (TextMessage) message;

//

// System.out.println("從MQ中獲取的消息是:"+tMsg.getText());

//加載監(jiān)聽器

consumer.setMessageListener(newMyListener());

//監(jiān)聽器需要持續(xù)加載,這里我們使用輸入流阻塞當(dāng)前線程結(jié)束。

System.in.read();

}catch(Exception e){

e.printStackTrace();

System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯誤!!");

}finally{

try?{

// 回收消息消費者資源

if(null?!= consumer)

consumer.close();

} catch?(JMSException e) {

e.printStackTrace();

}

try?{

// 回收會話資源

if(null?!= session)

session.close();

} catch?(JMSException e) {

e.printStackTrace();

}

try?{

// 回收鏈接資源

if(null?!= connection)

connection.close();

} catch?(JMSException e) {

e.printStackTrace();

}

}

return?message;

}

}

3.3?測試

(1)多次運行生產(chǎn)者,發(fā)送多條消息到隊列中。

(2)運行消費者。觀察結(jié)果

--查看Eclipse控制臺,一次消費了3條消息

--查看ActiveMQ管理控制界面,所有消息都被消費了!

--測試成功!!!

4?ActiveMQ消息服務(wù)模式

問題:在入門示例中,只能向一個消費者發(fā)送消息。但是有一些場景,需求有多個消費者都能接收到消息,比如:美團APP每天的消息推送。該如何實現(xiàn)呢?

答:ActiveMQ是通過不同的服務(wù)模式來解決這個問題的。

所以,要搞清楚這個問題,必須知道ActiveMQ有哪些應(yīng)用模式。

4.1?PTP模式(point to point)

--消息模型

消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費者從queue中取出并且消費消息。

消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經(jīng)被消費的消息。

Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費、其它的則不能消費此消息了。

當(dāng)消費者不存在時,消息會一直保存,直到有消費消費

我們的入門示例,就是采用的這種PTP服務(wù)模式。

4.2?TOPIC(主題訂閱模式)

--消息模型

消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時有多個消息消費者(訂閱)消費該消息。

和點對點方式不同,發(fā)布到topic的消息會被所有訂閱者消費。

當(dāng)生產(chǎn)者發(fā)布消息,不管是否有消費者。都不會保存消息

所以,主題訂閱模式下,一定要先有消息的消費者(訂閱者),后有消息的生產(chǎn)者(發(fā)布者)。

我們前面已經(jīng)實現(xiàn)了PTP模式,下面我們來實現(xiàn)TOPIC模式。

5?Topic模式實現(xiàn)

5.1?配置步驟說明

發(fā)表于 2019-08-01 00:00

閱讀 ( 411 )

總結(jié)

以上是生活随笔為你收集整理的java 消息队列服务_ActiveMQ 消息队列服务的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。