ActiveMQ 消息持久化
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ 消息持久化
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
生產端(消息持久化):
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Sender {public static void main(String[] args) throws Exception{//第一步:建立connectionFactory工廠對象【需填入用戶名、密碼、要連接的地址】ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin","1234","tcp://localhost:61616");//第二步:通過ConnectionFactory工廠對象創建一個Connection連接,并且調用Connection的start方法開啟連接【connection默認是關閉的】Connection connection = connectionFactory.createConnection();connection.start();//第三步:通過connection創建session會話(上下文環境對象),用于接收消息,參數配置1為是否啟用事務,參數配置2為簽收模式【一般我們設置為自動簽收】Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);//使用事務的方式進行消息的發送// Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//使用CLIENT端簽收的方式//Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);//第四步:通過session創建Destination對象,指的是一個客戶端用來指定生產消息目標和消費消息來源的對象,在PTP模式中,Destination被稱作Queue即隊列;在Pub/Sub模式,Destination被稱作Topic即主題.在程序中可以使用多個Queue和Topic.Destination destination = session.createQueue("first");//第五步:通過session對象創建消息的發送和接收對象(生產者和消費者)MessageProducer/MessageConsumerMessageProducer messageProducer = session.createProducer(null);//第六步:可以使用MessageProducer的setDeliveryMode方法為其設置持久化特性和非持久化特性(DeliveryMode)messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);for (int i = 1; i <= 10; i++) {//TextMessage textMessage = session.createTextMessage("helloworld"+i);TextMessage textMessage = session.createTextMessage("我是消息內容"+i);//第一個參數: 目的地//第二個參數: 消息//第三個參數: 是否持久化//第四個參數: 優先級【0-9 0-4表示普通 5-9表示加急 默認4】//第五個參數: 消息在mq上的存放有效期【單位毫秒】//messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2);messageProducer.send(destination, textMessage);//TimeUnit.SECONDS.sleep(1);System.out.println("生產者:"+textMessage.getText());}//提交數據//session.commit();//session.rollback();if (connection!=null) {connection.close();}} }?
消費端(消息持久化):
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Receiver { public static void main(String[] args) { // ConnectionFactory :連接工廠,JMS 用它創建連接 ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個發送或接收消息的線程 Session session; // Destination :消息的目的地;消息發送給誰. Destination destination; // 消費者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( "admin", "1234", "tcp://localhost:61616"); try { // 構造從工廠得到連接對象 connection = connectionFactory.createConnection(); // 啟動 connection.start(); // 獲取操作連接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 獲取session注意參數值xingbo.xu-queue是一個服務器的queue,須在在ActiveMq的console配置 destination = session.createQueue("first"); consumer = session.createConsumer(destination); while (true) { // 設置接收者接收消息的時間,為了便于測試,這里誰定為100s //TextMessage message = (TextMessage) consumer.receive(100000); TextMessage message = (TextMessage) consumer.receive(); if (null != message) { //System.out.println("收到消息" + message.getText()); System.out.println("消費數據:" + message.getText());//message.acknowledge();} else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } }?
總結
以上是生活随笔為你收集整理的ActiveMQ 消息持久化的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FastDFS_install_docu
- 下一篇: synchronized细节问题