ActiveMQ_3Java实现
Java實現(xiàn)
?
添加相應(yīng)的jar包
<dependency>
? <groupId>org.apache.activemq</groupId>
? <artifactId>activemq-all</artifactId>
? <version>x.xx.x</version>
</dependency>
?
創(chuàng)建生產(chǎn)者類(點對點)
public class ProducerTest {
??? // 異步發(fā)送asyn
??? // 死信隊列 DLQ
??? // 文件上傳
??? // header properties使用
??? // jdbc存儲
??? // byteMsg objMsg inputMsg
??? // mq ptp 的使用場景
??? // mq中所有的隊列名以及每個隊列中未被消費的消息數(shù)量
??? // mq sub/pub 的使用場景
??? @Test
??? public void testQueueProducer() throws JMSException{
?????? // 第一步:創(chuàng)建ConnectionFactory對象,需要指定服務(wù)端ip及端口號。
?????? //brokerURL服務(wù)器的ip及端口號
?????? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.114.129:61616");
?????? // 第二步:使用ConnectionFactory對象創(chuàng)建一個Connection對象
?????? Connection connection = connectionFactory.createConnection();
?????? // 第三步:開啟連接,調(diào)用Connection對象的start方法
?????? connection.start();
?????? // 第四步:使用Connection對象創(chuàng)建一個Session對象。
?????? //第一個參數(shù):是否開啟事務(wù)。true:開啟事務(wù),第二個參數(shù)忽略。
?????? //第二個參數(shù):當?shù)谝粋€參數(shù)為false時,才有意義。消息的應(yīng)答模式。1、自動應(yīng)答2、手動應(yīng)答。一般是自動應(yīng)答
?????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
?????? // 第五步:使用Session對象創(chuàng)建一個Destination對象(topic、queue),此處創(chuàng)建一個Queue對象。
?????? //參數(shù):隊列的名稱。
?????? Queue queue = session.createQueue("test-queue");
?????? // 第六步:使用Session對象創(chuàng)建一個Producer對象。
?????? MessageProducer producer = session.createProducer(queue);
?????? producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
?????? // 第七步:創(chuàng)建一個Message對象,創(chuàng)建一個TextMessage對象。
?????? /*TextMessage message = new ActiveMQTextMessage();
?????? message.setText("hello activeMq,this is my first test.");*/
?????? TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
?????? producer.send(textMessage);
?????? producer.close();
?????? session.close();
?????? connection.close();
??? }
}
?
創(chuàng)建消費者類(點對點)
public class ConsumerTest {
??? @Test
??? public void testQueueConsumer() throws JMSException, IOException{
?????? // 第一步:創(chuàng)建一個ConnectionFactory對象。
??????? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.114.129:61616");
??????? // 第二步:從ConnectionFactory對象中獲得一個Connection對象。
??????? Connection connection = connectionFactory.createConnection();
??????? // 第三步:開啟連接。調(diào)用Connection對象的start方法。
????? ??connection.start();
??????? // 第四步:使用Connection對象創(chuàng)建一個Session對象。
??????? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
??????? // 第五步:使用Session對象創(chuàng)建一個Destination對象。和發(fā)送端保持一致queue,并且隊列的名稱一致。
??????? Queue queue = session.createQueue("queue.stu");
??????? // 第六步:使用Session對象創(chuàng)建一個Consumer對象。
??????? MessageConsumer consumer = session.createConsumer(queue);
??????? // 第七步:接收消息。
??????? consumer.setMessageListener(new MessageListener()
???????????? @Override
???????????? public void onMessage(Message message) {
????????????????? try {
??????????????? ? ??//if (TextMessage.class.isAssignableFrom(message.getClass()))
??????????????? ? ??if (message instanceof TextMessage) {
??? ????????????????????? TextMessage textMessage = (TextMessage) message;
??? ????????????????????? String text = null;
??? ????????????????????? //取消息的內(nèi)容
??? ????????????????????? text = textMessage.getText();
??? ????????????????????? // 第八步:打印消息。
??? ????????????????????? System.out.println(text);
??????????????? ? ??}
????????????????? } catch (JMSException e) {
????????????????????? e.printStackTrace();
????????????????? }
???????????? }
??????? });
??????? //等待鍵盤輸入
??????? System.in.read();
??????? // 第九步:關(guān)閉資源
??????? consumer.close();
??????? session.close();
?????? ?connection.close();
??? }
}
?
運行active服務(wù)器驗證實現(xiàn)情況
?
發(fā)布(對點對點代碼修改 創(chuàng)建Topic Destination)
// 第五步:使用Session對象創(chuàng)建一個Destination對象(topic、queue),此處創(chuàng)建一個topic對象。
// 參數(shù):話題的名稱。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象創(chuàng)建一個Producer對象。
MessageProducer producer = session.createProducer(topic);
?
訂閱(對點對點代碼修改 創(chuàng)建Topic Destination)
// 第五步:使用Session對象創(chuàng)建一個Destination對象。和發(fā)送端保持一致topic,并且話題的名稱一致。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對象創(chuàng)建一個Consumer對象。
MessageConsumer consumer = session.createConsumer(topic);
?
轉(zhuǎn)載于:https://www.cnblogs.com/zhiboluo/p/10114647.html
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ_3Java实现的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Python 中的 lstrip、rst
- 下一篇: Java正则表达式应用