日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

ActiveMQ_3Java实现

發(fā)布時間:2025/3/21 java 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ_3Java实现 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

Java實現(xiàn)

?

添加相應(yīng)的jar包

<dependency>

? <groupId>org.apache.activemq</groupId>

? <artifactId>activemq-all</artifactId>

? <version>x.xx.x</version>

</dependency>

?

創(chuàng)建生產(chǎn)者類(點對點)

public class ProducerTest {

??? // 異步發(fā)送asyn

??? // 死信隊列 DLQ

??? // 文件上傳

??? // header properties使用

??? // jdbc存儲

??? // byteMsg objMsg inputMsg

??? // mq ptp 的使用場景

??? // mq中所有的隊列名以及每個隊列中未被消費的消息數(shù)量

??? // mq sub/pub 的使用場景

??? @Test

??? public void testQueueProducer() throws JMSException{

?????? // 第一步:創(chuàng)建ConnectionFactory對象,需要指定服務(wù)端ip及端口號。

?????? //brokerURL服務(wù)器的ip及端口號

?????? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.114.129:61616");

?????? // 第二步:使用ConnectionFactory對象創(chuàng)建一個Connection對象

?????? Connection connection = connectionFactory.createConnection();

?????? // 第三步:開啟連接,調(diào)用Connection對象的start方法

?????? connection.start();

?????? // 第四步:使用Connection對象創(chuàng)建一個Session對象。

?????? //第一個參數(shù):是否開啟事務(wù)。true:開啟事務(wù),第二個參數(shù)忽略。

?????? //第二個參數(shù):當?shù)谝粋€參數(shù)為false時,才有意義。消息的應(yīng)答模式。1、自動應(yīng)答2、手動應(yīng)答。一般是自動應(yīng)答

?????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

?????? // 第五步:使用Session對象創(chuàng)建一個Destination對象(topic、queue),此處創(chuàng)建一個Queue對象。

?????? //參數(shù):隊列的名稱。

?????? Queue queue = session.createQueue("test-queue");

?????? // 第六步:使用Session對象創(chuàng)建一個Producer對象。

?????? MessageProducer producer = session.createProducer(queue);

?????? producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

?????? // 第七步:創(chuàng)建一個Message對象,創(chuàng)建一個TextMessage對象。

?????? /*TextMessage message = new ActiveMQTextMessage();

?????? message.setText("hello activeMq,this is my first test.");*/

?????? TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");

?????? producer.send(textMessage);

?????? producer.close();

?????? session.close();

?????? connection.close();

??? }

}

?

創(chuàng)建消費者類(點對點)

public class ConsumerTest {

??? @Test

??? public void testQueueConsumer() throws JMSException, IOException{

?????? // 第一步:創(chuàng)建一個ConnectionFactory對象。

??????? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.114.129:61616");

??????? // 第二步:從ConnectionFactory對象中獲得一個Connection對象。

??????? Connection connection = connectionFactory.createConnection();

??????? // 第三步:開啟連接。調(diào)用Connection對象的start方法。

????? ??connection.start();

??????? // 第四步:使用Connection對象創(chuàng)建一個Session對象。

??????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

??????? // 第五步:使用Session對象創(chuàng)建一個Destination對象。和發(fā)送端保持一致queue,并且隊列的名稱一致。

??????? Queue queue = session.createQueue("queue.stu");

??????? // 第六步:使用Session對象創(chuàng)建一個Consumer對象。

??????? MessageConsumer consumer = session.createConsumer(queue);

??????? // 第七步:接收消息。

??????? consumer.setMessageListener(new MessageListener()

???????????? @Override

???????????? public void onMessage(Message message) {

????????????????? try {

??????????????? ? ??//if (TextMessage.class.isAssignableFrom(message.getClass()))

??????????????? ? ??if (message instanceof TextMessage) {

??? ????????????????????? TextMessage textMessage = (TextMessage) message;

??? ????????????????????? String text = null;

??? ????????????????????? //取消息的內(nèi)容

??? ????????????????????? text = textMessage.getText();

??? ????????????????????? // 第八步:打印消息。

??? ????????????????????? System.out.println(text);

??????????????? ? ??}

????????????????? } catch (JMSException e) {

????????????????????? e.printStackTrace();

????????????????? }

???????????? }

??????? });

??????? //等待鍵盤輸入

??????? System.in.read();

??????? // 第九步:關(guān)閉資源

??????? consumer.close();

??????? session.close();

?????? ?connection.close();

??? }

}

?

運行active服務(wù)器驗證實現(xiàn)情況

?

發(fā)布(對點對點代碼修改 創(chuàng)建Topic Destination)

// 第五步:使用Session對象創(chuàng)建一個Destination對象(topic、queue),此處創(chuàng)建一個topic對象。

// 參數(shù):話題的名稱。

Topic topic = session.createTopic("test-topic");

// 第六步:使用Session對象創(chuàng)建一個Producer對象。

MessageProducer producer = session.createProducer(topic);

?

訂閱(對點對點代碼修改 創(chuàng)建Topic Destination)

// 第五步:使用Session對象創(chuàng)建一個Destination對象。和發(fā)送端保持一致topic,并且話題的名稱一致。

Topic topic = session.createTopic("test-topic");

// 第六步:使用Session對象創(chuàng)建一個Consumer對象。

MessageConsumer consumer = session.createConsumer(topic);

?

轉(zhuǎn)載于:https://www.cnblogs.com/zhiboluo/p/10114647.html

總結(jié)

以上是生活随笔為你收集整理的ActiveMQ_3Java实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。