日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMQ之发布- 订阅消息模式实现

發布時間:2025/3/15 编程问答 12 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ之发布- 订阅消息模式实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一、概念

  • 發布者/訂閱者模型支持向一個特定的消息主題發布消息。0或多個訂閱者可能對接收來自特定消息主題的消息感興趣。在這種模型下,發布者和訂閱者彼此不知道對方。這種模式好比是匿名公告板。這種模式被概括為:多個消費者可以獲得消息
  • 在發布者和訂閱者之間存在時間依賴性。發布者需要建立一個訂閱(subscription),以便客戶能夠訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者建立了持久的訂閱。在那種情況下,在訂閱者未連接時發布的消息將在訂閱者重新連接時重新發布。

二、案例

  2.1  消息生產者-消息發布者

package com.shyroke.firstActiveMQ2;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;/*** 消息生產者* @author Administrator**/ public class JMSProducer {private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默認的連接用戶名private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默認的連接密碼private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默認的連接地址private static final int SENDNUM=10; // 發送的消息數量public static void main(String[] args) {ConnectionFactory connectionFactory; // 連接工廠Connection connection = null; // 連接Session session; // 會話 接受或者發送消息的線程Destination destination; // 消息的目的地MessageProducer messageProducer; // 消息生產者// 實例化連接工廠connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);try {connection=connectionFactory.createConnection(); // 通過連接工廠獲取連接connection.start(); // 啟動連接session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 創建Session // destination=session.createQueue("FirstQueue1"); // 創建消息隊列destination=session.createTopic("firstTopic");messageProducer=session.createProducer(destination); // 創建消息生產者sendMessage(session, messageProducer); // 發送消息session.commit();} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} finally{if(connection!=null){try {connection.close();} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}/*** 發送消息* @param session* @param messageProducer* @throws Exception*/public static void sendMessage(Session session,MessageProducer messageProducer)throws Exception{for(int i=0;i<JMSProducer.SENDNUM;i++){TextMessage message=session.createTextMessage("ActiveMQ 發布的消息"+i);System.out.println("發送消息:"+"ActiveMQ 發布的消息"+i);messageProducer.send(message);}} }

?

?

  2.2  消息消費者-消息訂閱者一

package com.shyroke.firstActiveMQ2;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSConsumer {private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默認的連接用戶名private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默認的連接密碼private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默認的連接地址public static void main(String[] args) {ConnectionFactory connectionFactory; // 連接工廠Connection connection = null; // 連接Session session; // 會話 接受或者發送消息的線程Destination destination; // 消息的目的地MessageConsumer consumer; //創建消費者// 實例化連接工廠connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);try {connection=connectionFactory.createConnection(); // 通過連接工廠獲取連接connection.start(); // 啟動連接/*** 這里的最好使用Boolean.FALSE,如果是用true則必須commit才能生效,且http://127.0.0.1:8161/admin管理頁面才會更新消息隊列的變化情況。*/session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建Session // destination=session.createQueue("FirstQueue1"); // 創建消息隊列destination=session.createTopic("firstTopic");consumer=session.createConsumer(destination);consumer.setMessageListener(new MyListener()); // 注冊消息監聽} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} }}

?

  • 監聽器1

package com.shyroke.firstActiveMQ2;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;public class MyListener implements MessageListener {public void onMessage(Message message) {try {System.out.println("訂閱者一收到的消息:"+((TextMessage)message).getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}

?

?

  2.3  消息消費者-消息訂閱者二

package com.shyroke.firstActiveMQ2;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSConsumer2 {private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默認的連接用戶名private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默認的連接密碼private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默認的連接地址public static void main(String[] args) {ConnectionFactory connectionFactory; // 連接工廠Connection connection = null; // 連接Session session; // 會話 接受或者發送消息的線程Destination destination; // 消息的目的地MessageConsumer consumer; //創建消費者// 實例化連接工廠connectionFactory=new ActiveMQConnectionFactory(JMSConsumer2.USERNAME, JMSConsumer2.PASSWORD, JMSConsumer2.BROKEURL);try {connection=connectionFactory.createConnection(); // 通過連接工廠獲取連接connection.start(); // 啟動連接/*** 這里的最好使用Boolean.FALSE,如果是用true則必須commit才能生效,且http://127.0.0.1:8161/admin管理頁面才會更新消息隊列的變化情況。*/session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 創建Session // destination=session.createQueue("FirstQueue1"); // 創建消息隊列destination=session.createTopic("firstTopic");consumer=session.createConsumer(destination);consumer.setMessageListener(new MyListener2()); // 注冊消息監聽} catch (Exception e) {// TODO Auto-generated catch blocke.printStackTrace();} } }

?

  • 監聽器2

package com.shyroke.firstActiveMQ2;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage;public class MyListener implements MessageListener {public void onMessage(Message message) {try {System.out.println("訂閱者一收到的消息:"+((TextMessage)message).getText());} catch (JMSException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}

?

?

  • 測試以及結果

發布- 訂閱消息模式首先必須訂閱者先訂閱服務,然后發布者再發布消息,最后訂閱者受到服務消息。

所以本章我們先執行消息訂閱者一和消息訂閱者二的代碼然后查看ActiveMQ的管理界面:

如圖可見,訂閱者已經注冊成功,然后再發布者發布消息:

總結

以上是生活随笔為你收集整理的ActiveMQ之发布- 订阅消息模式实现的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。