ActiveMQ 事务消息 手工签收
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ 事务消息 手工签收
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
發(fā)送端/生產(chǎn)者(帶有事務(wù)):
import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] args) throws Exception{//第一步:建立connectionFactory工廠對(duì)象【需填入用戶名、密碼、要連接的地址】ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","1234","tcp://localhost:61616");//第二步:通過(guò)ConnectionFactory工廠對(duì)象創(chuàng)建一個(gè)Connection連接,并且調(diào)用Connection 的start方法開(kāi)啟連接【connection默認(rèn)是關(guān)閉的】Connection connection = connectionFactory.createConnection();connection.start();//第三步:通過(guò)connection創(chuàng)建session會(huì)話(上下文環(huán)境對(duì)象),用于接收消息,參數(shù)配置1為是否啟用事務(wù), 參數(shù)配置2為簽收模式【一般我們?cè)O(shè)置為自動(dòng)簽收】Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);//使用事務(wù)的方式進(jìn)行消息的發(fā)送// Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//使用CLIENT端簽收的方式//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//第四步:通過(guò)session創(chuàng)建Destination對(duì)象,指的是一個(gè)客戶端用來(lái)指定生產(chǎn)消息目標(biāo)和消費(fèi)消息來(lái)源的對(duì)象, 在PTP模式中,Destination被稱作Queue即隊(duì)列;在Pub/Sub模式,Destination被稱作Topic即主題. 在程序中可以使用多個(gè)Queue和Topic.Destination destination = session.createQueue("first");//第五步:通過(guò)session對(duì)象創(chuàng)建消息的發(fā)送和接收對(duì)象(生產(chǎn)者和消費(fèi)者)MessageProducer/MessageConsumerMessageProducer messageProducer = session.createProducer(null);//第六步:可以使用MessageProducer的setDeliveryMode方法為其設(shè)置持久化特性 和非持久化特性(DeliveryMode)//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//第七步:使用JMS規(guī)范的TextMessage形式創(chuàng)建數(shù)據(jù)(通過(guò)session對(duì)象),并用MessageProducer 的send方法發(fā)送數(shù)據(jù)for (int i = 1; i <= 10; i++) {//TextMessage textMessage = session.createTextMessage("helloworld"+i);TextMessage textMessage = session.createTextMessage("我是消息內(nèi)容"+i);//第一個(gè)參數(shù): 目的地//第二個(gè)參數(shù): 消息//第三個(gè)參數(shù): 是否持久化//第四個(gè)參數(shù): 優(yōu)先級(jí)【0-9 0-4表示普通 5-9表示加急 默認(rèn)4】//第五個(gè)參數(shù): 消息在mq上的存放有效期【單位毫秒】//messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2);messageProducer.send(destination, textMessage);//TimeUnit.SECONDS.sleep(1);System.out.println("生產(chǎn)者:"+textMessage.getText());}//提交數(shù)據(jù)session.commit();//session.rollback();if (connection!=null) {connection.close();}} }?接收端/消費(fèi)者:
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver { public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個(gè)發(fā)送或接收消息的線程 Session session; // Destination :消息的目的地;消息發(fā)送給誰(shuí). Destination destination; // 消費(fèi)者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp:/localhost:61616"); try { // 構(gòu)造從工廠得到連接對(duì)象 connection = connectionFactory.createConnection(); // 啟動(dòng) connection.start(); // 獲取操作連接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數(shù)值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在ActiveMq的console配置 destination = session.createQueue("first"); consumer = session.createConsumer(destination); while (true) { // 設(shè)置接收者接收消息的時(shí)間,為了便于測(cè)試,這里誰(shuí)定為100s //TextMessage message = (TextMessage) consumer.receive(100000); TextMessage message = (TextMessage) consumer.receive(); if (null != message) { //System.out.println("收到消息" + message.getText()); System.out.println("消費(fèi)數(shù)據(jù):" + message.getText());} else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }?
生產(chǎn)者(沒(méi)有事務(wù)手動(dòng)簽收):
import javax.jms.Connection; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] args) throws Exception{//第一步:建立connectionFactory工廠對(duì)象【需填入用戶名、密碼、要連接的地址】ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","1234","tcp://localhost:61616");//第二步:通過(guò)ConnectionFactory工廠對(duì)象創(chuàng)建一個(gè)Connection連接,并且調(diào)用Connection的start方法 開(kāi)啟連接【connection默認(rèn)是關(guān)閉的】Connection connection = connectionFactory.createConnection();connection.start();//第三步:通過(guò)connection創(chuàng)建session會(huì)話(上下文環(huán)境對(duì)象),用于接收消息,參數(shù)配置1為是否啟用事務(wù),參數(shù)配置2為簽收模式【一般我們?cè)O(shè)置為自動(dòng)簽收】Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);//使用事務(wù)的方式進(jìn)行消息的發(fā)送// Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//使用CLIENT端簽收的方式//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//第四步:通過(guò)session創(chuàng)建Destination對(duì)象,指的是一個(gè)客戶端用來(lái)指定生產(chǎn)消息目標(biāo)和消費(fèi)消息來(lái)源的對(duì)象,在PTP模式中,Destination被稱作Queue即隊(duì)列; 在Pub/Sub模式,Destination被稱作Topic即主題.在程序中可以使用多個(gè)Queue和Topic.Destination destination = session.createQueue("first");//第五步:通過(guò)session對(duì)象創(chuàng)建消息的發(fā)送和接收對(duì)象(生產(chǎn)者和消費(fèi)者)MessageProducer/MessageConsumerMessageProducer messageProducer = session.createProducer(null);//第六步:可以使用MessageProducer的setDeliveryMode方法為其設(shè)置持久化特性和非持久化特性(DeliveryMode)//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//第七步:使用JMS規(guī)范的TextMessage形式創(chuàng)建數(shù)據(jù)(通過(guò)session對(duì)象),并用MessageProducer的send方法發(fā)送數(shù)據(jù)for (int i = 1; i <= 20; i++) {//TextMessage textMessage = session.createTextMessage("helloworld"+i);TextMessage textMessage = session.createTextMessage("我是消息內(nèi)容"+i);//第一個(gè)參數(shù): 目的地//第二個(gè)參數(shù): 消息//第三個(gè)參數(shù): 是否持久化//第四個(gè)參數(shù): 優(yōu)先級(jí)【0-9 0-4表示普通 5-9表示加急 默認(rèn)4】//第五個(gè)參數(shù): 消息在mq上的存放有效期【單位毫秒】//messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2);messageProducer.send(destination, textMessage);//TimeUnit.SECONDS.sleep(1);System.out.println("生產(chǎn)者:"+textMessage.getText());}//提交數(shù)據(jù)//session.commit();//session.rollback();if (connection!=null) {connection.close();}} }消費(fèi)端(沒(méi)有事務(wù)手動(dòng)簽收):
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver { public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個(gè)發(fā)送或接收消息的線程 Session session; // Destination :消息的目的地;消息發(fā)送給誰(shuí). Destination destination; // 消費(fèi)者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616"); try { // 構(gòu)造從工廠得到連接對(duì)象 connection = connectionFactory.createConnection(); // 啟動(dòng) connection.start(); // 獲取操作連接 session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE); // 獲取session注意參數(shù)值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在ActiveMq的console配置 destination = session.createQueue("first"); consumer = session.createConsumer(destination); while (true) { // 設(shè)置接收者接收消息的時(shí)間,為了便于測(cè)試,這里誰(shuí)定為100s //TextMessage message = (TextMessage) consumer.receive(100000); TextMessage message = (TextMessage) consumer.receive(); if (null != message) { //System.out.println("收到消息" + message.getText()); System.out.println("消費(fèi)數(shù)據(jù):" + message.getText());message.acknowledge();} else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }?
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ 事务消息 手工签收的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: ActiveMQ 消息持久化到Mysql
- 下一篇: FastDFS_install_docu