日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMQ的queue以及topic两种消息处理机制分析

發布時間:2024/4/14 编程问答 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ的queue以及topic两种消息处理机制分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉自: http://itindex.net/detail/50057-activemq-queue-topic

?

上一期介紹了我們項目要用到activeMQ來作為jms總線,并且給大家介紹了activeMQ的集群和高可用部署方案,本期給大家再介紹下,如何根據自己的項目需求,更好地使用activeMQ的兩種消息處理模式。

?

???????

?

1??? queue與topic的技術特點對比

?

???????????? topic??????????????????????????????????????????????????????????????????? queue

?

?????

????? topic

概要

Publish Subscribe messaging 發布訂閱消息

Point-to-Point 點對點

有無狀態

topic數據默認不落地,是無狀態的。

Queue數據默認會在mq服務器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存儲。

完整性保障

并不保證publisher發布的每條數據,Subscriber都能接受到。

Queue保證每條數據都能被receiver接收。

消息是否會丟失

一般來說publisher發布消息到某一個topic時,只有正在監聽該topic地址的sub能夠接收到消息;如果沒有sub在監聽,該topic就丟失了。

Sender發送消息到目標Queue,receiver可以異步接收這個Queue上的消息。Queue上的消息如果暫時沒有receiver來取,也不會丟失。

消息發布接收策略

一對多的消息發布接收策略,監聽同一個topic地址的多個sub都能收到publisher發送的消息。Sub接收完通知mq服務器

一對一的消息發布接收策略,一個sender發送的消息,只能有一個receiver接收。receiver接收完后,通知mq服務器已接收,mq服務器對queue里的消息采取刪除或其他操作。

?

????????? Topic和queue的最大區別在于topic是以廣播的形式,通知所有在線監聽的客戶端有新的消息,沒有監聽的客戶端將收不到消息;而queue則是以點對點的形式通知多個處于監聽狀態的客戶端中的一個。

?

?

?

2??? topic和queue方式的消息處理效率比較

?

??????? 通過增加監聽客戶端的并發數來驗證,topic的消息推送,是否會因為監聽客戶端的并發上升而出現明顯的下降,測試環境的服務器為ci環境的ActiveMQ,客戶端為我的本機。

?

??????? 從實測的結果來看,topic方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者(線程)并發的 前提下,效率差異很明顯(由于500線程并發的情況下,我本機的cpu占用率已高達70-90%,所以無法確認是我本機測試造成的性能瓶頸還是topic 消息發送方式存在性能瓶頸,造成效率下降如此明顯)。

?

??????? Topic方式發送的消息與queue方式發送的消息,發送和接收的效率,在一個訂閱者和100個訂閱者的前提下沒有明顯差異,但在500個訂閱者并發的前提下,topic方式的效率明顯低于queue。

?

??????? Queue方式發送的消息,在一個訂閱者、100個訂閱者和500個訂閱者的前提下,發送和接收的效率沒有明顯變化。

?

Topic實測數據:

?

?

?

 

發送者發送的消息總數

所有訂閱者接收到消息的總數

消息發送和接收平均耗時

單訂閱者

100

100

101ms

100訂閱者

100

10000

103ms

500訂閱者

100

50000

14162ms

?

?

?

Queue實測數據:

?

?

?

 

發送者發送的消息總數

所有訂閱者接收到消息的總數

消息發送和接收平均耗時

單訂閱者

100

100

96ms

100訂閱者

100

100

96ms

500訂閱者

100

100

100ms

?

?

?

3???? topic方式的消息處理示例

?

3.1???? 通過客戶端代碼調用來發送一個topic的消息:

?

import javax.jms.Connection;

?

import javax.jms.ConnectionFactory;

?

import javax.jms.DeliveryMode;

?

import javax.jms.Destination;

?

import javax.jms.MessageProducer;

?

import javax.jms.Session;

?

import javax.jms.TextMessage;

?

?

?

import org.apache.activemq.ActiveMQConnection;

?

import org.apache.activemq.ActiveMQConnectionFactory;

?

?

?

publicclass SendTopic {

?

??? privatestaticfinalint SEND_NUMBER = 5;

?

??? publicstaticvoid sendMessage(Session session, MessageProducer producer)

?

??????????? throws Exception {

?

?? ????? for ( int i = 1; i <= SEND_NUMBER; i++) {

?

??????????? TextMessage message = session

?

??????????????????? .createTextMessage("ActiveMq發送的消息" + i);

?

??????????? //發送消息到目的地方

?

??????????? System. out.println("發送消息:" + "ActiveMq 發送的消息" + i);

?

??????????? producer.send(message);

?

??????? }

?

??? }

?

???

?

??? publicstaticvoid main(String[] args) {

?

??????? // ConnectionFactory:連接工廠,JMS用它創建連接

?

??????? ConnectionFactory connectionFactory;

?

??????? // Connection:JMS客戶端到JMS Provider的連接

?

??????? Connection connection = null;

?

??????? // Session:一個發送或接收消息的線程

?

??????? Session session;

?

??????? // Destination:消息的目的地;消息發送給誰.

?

??????? Destination destination;

?

??????? // MessageProducer:消息發送者

?

??????? MessageProducer producer;

?

??????? // TextMessage message;

?

??????? //構造ConnectionFactory實例對象,此處采用ActiveMq的實現jar

?

??????? connectionFactory = new ActiveMQConnectionFactory(

?

??????????????? ActiveMQConnection. DEFAULT_USER,

?

??????????????? ActiveMQConnection. DEFAULT_PASSWORD,

?

??????????????? "tcp://10.20.8.198:61616");

?

??????? try {

?

??????????? //構造從工廠得到連接對象

?

??????????? connection = connectionFactory.createConnection();

?

??????????? //啟動

?

??????????? connection.start();

?

??????????? //獲取操作連接

?

??????????? session = connection.createSession( true, Session. AUTO_ACKNOWLEDGE);

?

??????????? //獲取session注意參數值FirstTopic是一個服務器的topic(與queue消息的發送相比,這里是唯一的不同)

?

??????????? destination = session.createTopic("FirstTopic");

?

??????? ????//得到消息生成者【發送者】

?

??????????? producer = session.createProducer(destination);

?

??????????? //設置不持久化,此處學習,實際根據項目決定

?

??????????? producer.setDeliveryMode(DeliveryMode. PERSISTENT);

?

??????????? //構造消息,此處寫死,項目就是參數,或者方法獲取

?

??????????? sendMessage(session, producer);

?

??????????? session.commit();

?

??????? } catch (Exception e) {

?

??????????? e.printStackTrace();

?

??????? } finally {

?

??????????? try {

?

??????????????? if ( null != connection)

?

??????????????????? connection.close();

?

??????????? } catch (Throwable ignore) {

?

??????????? }

?

??????? }

?

??? }

?

}

?

?

?

3.2???? 啟動多個客戶端監聽來接收topic的消息:

?

publicclass ReceiveTopic implements Runnable {

?

????? private StringthreadName;

?

?

?

????? ReceiveTopic(String threadName) {

?

?????????? this.threadName = threadName;

?

????? }

?

?

?

????? publicvoid run() {

?

?????????? // ConnectionFactory:連接工廠,JMS用它創建連接

?

?????????? ConnectionFactory connectionFactory;

?

?????????? // Connection:JMS客戶端到JMS Provider的連接

?

?????????? Connection connection = null;

?

?????????? // Session:一個發送或接收消息的線程

?

?????????? Session session;

?

?????????? // Destination:消息的目的地;消息發送給誰.

?

?????????? Destination destination;

?

?????????? //消費者,消息接收者

?

?????????? MessageConsumer consumer;

?

?????????? connectionFactory = new ActiveMQConnectionFactory(

?

????????????????????? ActiveMQConnection. DEFAULT_USER,

?

????????????????????? ActiveMQConnection. DEFAULT_PASSWORD,"tcp://10.20.8.198:61616");

?

?????????? try {

?

???????????????? //構造從工廠得到連接對象

?

???????????????? connection = connectionFactory.createConnection();

?

???????????????? //啟動

?

???????????????? connection.start();

?

???????????????? //獲取操作連接,默認自動向服務器發送接收成功的響應

?

???????????????? session = connection.createSession( false, Session. AUTO_ACKNOWLEDGE);

?

???????????????? //獲取session注意參數值FirstTopic是一個服務器的topic

?

???????????????? destination = session.createTopic("FirstTopic");

?

???????????????? consumer = session.createConsumer(destination);

?

???????????????? while ( true) {

?

????????????????????? //設置接收者接收消息的時間,為了便于測試,這里設定為100s

?

????????????????????? TextMessage message = (TextMessage) consumer

?

????????????????????????????????? .receive(100 * 1000);

?

????????????????????? if ( null != message) {

?

??????????????????????????? System. out.println("線程"+threadName+"收到消息:" + message.getText());

?

????????????????????? } else {

?

??????????????????????????? continue;

?

????????????????????? }

?

???????????????? }

?

?????????? } catch (Exception e) {

?

???????????????? e.printStackTrace();

?

?????????? } finally {

?

???????????????? try {

?

????????????????????? if ( null != connection)

?

??????????????????????????? connection.close();

?

???????????????? } catch (Throwable ignore) {

?

???????????????? }

?

?????????? }

?

????? }

?

?

?

????? publicstaticvoid main(String[] args) {

?

????? ????? //這里啟動3個線程來監聽FirstTopic的消息,與queue的方式不一樣三個線程都能收到同樣的消息

?

?????????? ReceiveTopic receive1= new ReceiveTopic("thread1");

?

?????????? ReceiveTopic receive2= new ReceiveTopic("thread2");

?

?????????? ReceiveTopic receive3= new ReceiveTopic("thread3");

?

?????????? Thread thread1= new Thread(receive1);

?

?????????? Thread thread2= new Thread(receive2);

?

?????????? Thread thread3= new Thread(receive3);

?

?????????? thread1.start();

?

?????????? thread2.start();

?

?????????? thread3.start();

?

????? }

?

}

?

?

?

4???? queue方式的消息處理示例

?

總結

以上是生活随笔為你收集整理的ActiveMQ的queue以及topic两种消息处理机制分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。