Spring+ActiveMQ+Mysql 配置JMS
生活随笔
收集整理的這篇文章主要介紹了
Spring+ActiveMQ+Mysql 配置JMS
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
消息發(fā)送public static void main(String[] args) throws JMSException{// ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");// JMS 客戶端到JMS Provider 的連接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一個(gè)發(fā)送或接收消息的線程Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息發(fā)送給誰.// 獲取session注意參數(shù)值Queue.Name是Query的名字Destination destination = session.createQueue("[color=red]Queue.Name[/color]");// MessageProducer:消息生產(chǎn)者M(jìn)essageProducer producer = session.createProducer(destination);// 設(shè)置不持久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);// 發(fā)送一條消息sendMsg(session, producer);session.commit();connection.close();}/*** 在指定的會(huì)話上,通過指定的消息生產(chǎn)者發(fā)出一條消息* * @param session 消息會(huì)話* @param producer 消息生產(chǎn)者*/public static void sendMsg(Session session, MessageProducer producer) throws JMSException{// 創(chuàng)建一條文本消息TextMessage message = session.createTextMessage("Hello ActiveMQ!");// 通過消息生產(chǎn)者發(fā)出消息producer.send(message);System.out.println("");}
消息接收public static void main(String[] args) throws JMSException{// ConnectionFactory :連接工廠,JMS 用它創(chuàng)建連接ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");// JMS 客戶端到JMS Provider 的連接Connection connection = connectionFactory.createConnection();connection.start();// Session: 一個(gè)發(fā)送或接收消息的線程Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);// Destination :消息的目的地;消息發(fā)送給誰.// 獲取session注意參數(shù)值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在ActiveMq的console配置Destination destination = session.createQueue("Queue.Name");// 消費(fèi)者,消息接收者M(jìn)essageConsumer consumer = session.createConsumer(destination);while(true){TextMessage message = (TextMessage) consumer.receive(1000);if(null != message)System.out.println("收到消息:" + message.getText());elsebreak;}session.close();connection.close();}
spring application.xml 文件配置<!-- 配置JMS消息發(fā)送 --><bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL"><value>tcp://localhost:61616</value></property></bean></property></bean><!-- Spring JMS Template --><bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory"><ref local="jmsFactory" /></property></bean><bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0"><value>Queue.Name</value></constructor-arg></bean><bean id="sender" class="demo.JmsQueueSender"><property name="jmsTemplate" ref="myJmsTemplate"></property></bean><bean id="receive" class="demo.JmsQueueReceiver"></bean><bean id="listenerContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="jmsFactory"></property><property name="messageListener" ref="receive"></property><property name="destination" ref="destination" /></bean><!-- 配置JMS消息發(fā)送完成 -->
sender@Component
public class JmsQueueSender
{private JmsTemplate jmsTemplate;public void setConnectionFactory(ConnectionFactory cf){this.jmsTemplate = new JmsTemplate(cf);}public void simpleSend(){jmsTemplate.convertAndSend("Queue.Name", "test!!!");}public JmsTemplate getJmsTemplate(){return jmsTemplate;}public void setJmsTemplate(JmsTemplate jmsTemplate){this.jmsTemplate = jmsTemplate;}
}
receiver@Component
public class JmsQueueReceiver implements MessageListener
{@Overridepublic void onMessage(Message message){if(message instanceof TextMessage){final TextMessage textMessage = (TextMessage) message;try{System.out.println(textMessage.getText());}catch(final JMSException e){e.printStackTrace();}}}
}
配置ActiveMQ以數(shù)據(jù)庫的方式存儲(chǔ)消息 ActiveMQ安裝目錄\conf\activemq.xml 找到 <broker>標(biāo)簽中的內(nèi)容<persistenceAdapter><kahaDB directory="${activemq.base}/data/kahadb"/></persistenceAdapter>注釋掉以上內(nèi)容,添加自己的數(shù)據(jù)庫配置<persistenceAdapter><jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>配置以Mysql的方式保存消息 <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"><property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/><property name="username" value="root"/><property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/>
</bean> 將Mysql的包加到ActiveMQ的啟動(dòng)Lib下 在Mysql數(shù)據(jù)中新建數(shù)據(jù)庫 test,ActiveMQ在啟動(dòng)的時(shí)候會(huì)自動(dòng)建表。 重新啟動(dòng)服務(wù)。 這樣消息的發(fā)送者的消息將被保存到Mysql數(shù)據(jù)庫,同時(shí)消息消耗者每讀取一條消息。數(shù)據(jù)庫中的消息也會(huì)相應(yīng)的刪除。
?
總結(jié)
以上是生活随笔為你收集整理的Spring+ActiveMQ+Mysql 配置JMS的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 设计模式之_Iterator_06
- 下一篇: mysql的事物隔离级别