深入掌握JMS--转
深入掌握J(rèn)MS(一):JSM基礎(chǔ)
1. JMS基本概念
???? JMS(Java Message Service) 即Java消息服務(wù)。它提供標(biāo)準(zhǔn)的產(chǎn)生、發(fā)送、接收消息的接口簡(jiǎn)化企業(yè)?應(yīng)用的開發(fā)。它支持兩種消息通信模型:點(diǎn)到點(diǎn)(point-to-point)(P2P)模型和發(fā)布/訂閱(Pub/Sub)模型。P2P 模型規(guī)定了一個(gè)消息只能有一個(gè)接收者;Pub/Sub 模型允許一個(gè)消息可以有多個(gè)接收者。
??? 對(duì)于點(diǎn)到點(diǎn)模型,消息生產(chǎn)者產(chǎn)生一個(gè)消息后,把這個(gè)消息發(fā)送到一個(gè)Queue(隊(duì)列)中,然后消息接收者再?gòu)倪@個(gè)Queue中讀取,一旦這個(gè)消息被一個(gè)接收者讀取之后,它就在這個(gè)Queue中消失了,所以一個(gè)消息只能被一個(gè)接收者消費(fèi)。
??? 與點(diǎn)到點(diǎn)模型不同,發(fā)布/訂閱模型中,消息生產(chǎn)者產(chǎn)生一個(gè)消息后,把這個(gè)消息發(fā)送到一個(gè)Topic中,這個(gè)Topic可以同時(shí)有多個(gè)接收者在監(jiān)聽,當(dāng)一個(gè)消息到達(dá)這個(gè)Topic之后,所有消息接收者都會(huì)收到這個(gè)消息。
?
?
| 簡(jiǎn)單的講,點(diǎn)到點(diǎn)模型和發(fā)布/訂閱模型的區(qū)別就是前者是一對(duì)一,后者是一對(duì)多。 |
?
2. 幾個(gè)重要概念?
???Destination?:消息發(fā)送的目的地,也就是前面說的Queue和Topic。創(chuàng)建好一個(gè)消息之后,只需要把這個(gè)消息發(fā)送到目的地,消息的發(fā)送者就可以繼續(xù)做自己的事情,而不用等待消息被處理完成。至于這個(gè)消息什么時(shí)候,會(huì)被哪個(gè)消費(fèi)者消費(fèi),完全取決于消息的接受者。
? ??Message?:從字面上就可以看出是被發(fā)送的消息。它有下面幾種類型:
??????? StreamMessage:Java 數(shù)據(jù)流消息,用標(biāo)準(zhǔn)流操作來順序的填充和讀取。
??????? MapMessage:一個(gè)Map類型的消息;名稱為 string 類型,而值為 Java 的基本類型。
??????? TextMessage:普通字符串消息,包含一個(gè)String。
??????? ObjectMessage:對(duì)象消息,包含一個(gè)可序列化的Java 對(duì)象
??????? BytesMessage:二進(jìn)制數(shù)組消息,包含一個(gè)byte[]。
??????? XMLMessage:? 一個(gè)XML類型的消息。
??? 最常用的是TextMessage和ObjectMessage。
???Session:?與JMS提供者所建立的會(huì)話,通過Session我們才可以創(chuàng)建一個(gè)Message。
???Connection:?與JMS提供者建立的一個(gè)連接??梢詮倪@個(gè)連接創(chuàng)建一個(gè)會(huì)話,即Session。
???ConnectionFactory:?那如何創(chuàng)建一個(gè)Connection呢?這就需要下面講到的ConnectionFactory了。通過這個(gè)工廠類就可以得到一個(gè)與JMS提供者的連接,即Conection。
???Producer:?消息的生產(chǎn)者,要發(fā)送一個(gè)消息,必須通過這個(gè)生產(chǎn)者來發(fā)送。
???MessageConsumer:?與生產(chǎn)者相對(duì)應(yīng),這是消息的消費(fèi)者或接收者,通過它來接收一個(gè)消息。
???前面多次提到JMS提供者,因?yàn)镴MS給我們提供的只是一系列接口,當(dāng)我們使用一個(gè)JMS的時(shí)候,還是需要一個(gè)第三方的提供者,它的作用就是真正管理?這些Connection,Session,Topic和Queue等。
??? 通過下面這個(gè)簡(jiǎn)圖可以看出上面這些概念的關(guān)系。
? ConnectionFactory---->Connection--->Session--->Message
? Destination + Session------------------------------------>Producer
? Destination + Session------------------------------------>MessageConsumer?
?
??? 那么可能有人會(huì)問: ConnectionFactory和Destination 從哪兒得到?
??? 這就和JMS提供者有關(guān)了. 如果在一個(gè)JavaEE環(huán)境中, 可以通過JNDI查找得到, 如果在一個(gè)非JavaEE環(huán)境中, 那只能通過JMS提供者提供給我們的接口得到了.
深入掌握J(rèn)MS(二):一個(gè)JMS例子
??????? 前一講簡(jiǎn)單的介紹了一下JMS的基本概念, 這一講結(jié)合一個(gè)例子讓大家深入理解前一講的基本概念. 首先需要做的是選擇一個(gè)JMS提供者, 如果在JavaEE環(huán)境中可以不用考慮這些. 我們選擇ActiveMQ, 官方地址: http://activemq.apache.org/. 網(wǎng)上有很多介紹ActiveMQ的文檔, 所以在這里就不介紹了.
按照上一講的這個(gè)簡(jiǎn)圖,
? ConnectionFactory---->Connection--->Session--->Message
? Destination + Session------------------------------------>Producer
? Destination + Session------------------------------------>MessageConsumer
首先需要得到ConnectionFactoy和Destination,這里創(chuàng)建一個(gè)一對(duì)一的Queue作為Destination。
? ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
? Queue queue = new ActiveMQQueue("testQueue");
然后又ConnectionFactory創(chuàng)建一個(gè)Connection, 再啟動(dòng)這個(gè)Connection:
? Connection connection = factory.createConnection();
? connection.start();
接下來需要由Connection創(chuàng)建一個(gè)Session:
? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
??? 現(xiàn)在暫且不用管參數(shù)的含義, 以后會(huì)詳細(xì)講到.
下面就可以創(chuàng)建Message了,這里創(chuàng)建一個(gè)TextMessage。
? Message message = session.createTextMessage("Hello JMS!");
要想把剛才創(chuàng)建的消息發(fā)送出去,需要由Session和Destination創(chuàng)建一個(gè)消息生產(chǎn)者:
? MessageProducer producer = session.createProducer(queue);
下面就可以發(fā)送剛才創(chuàng)建的消息了:
? producer.send(message);
消息發(fā)送完成之后,我們需要?jiǎng)?chuàng)建一個(gè)消息消費(fèi)者來接收這個(gè)消息:
? MessageConsumer comsumer = session.createConsumer(queue);
? Message recvMessage = comsumer.receive();
消息消費(fèi)者接收到這個(gè)消息之后,就可以得到它的內(nèi)容:
? System.out.println(((TextMessage)recvMessage).getText());
至此,一個(gè)簡(jiǎn)單的JMS例子就完成了。下面是全部源碼 :
import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class MessageSendAndReceive {public static void main(String[] args) throws Exception {ConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");Connection connection = factory.createConnection();connection.start();Queue queue = new ActiveMQQueue("testQueue");final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Message message = session.createTextMessage("Hello JMS!");MessageProducer producer = session.createProducer(queue);producer.send(message);System.out.println("Send Message Completed!");MessageConsumer comsumer = session.createConsumer(queue);Message recvMessage = comsumer.receive();System.out.println(((TextMessage)recvMessage).getText());} }?深入掌握J(rèn)MS(三):MessageListener
?
消息的消費(fèi)者接收消息可以采用兩種方式:
? 1、consumer.receive() 或 consumer.receive(int timeout);
? 2、注冊(cè)一個(gè)MessageListener。
采用第一種方式,消息的接收者會(huì)一直等待下去,直到有消息到達(dá),或者超時(shí)。后一種方式會(huì)注冊(cè)一個(gè)監(jiān)聽器,當(dāng)有消息到達(dá)的時(shí)候,會(huì)回調(diào)它的onMessage()方法。下面舉例說明:
MessageConsumer comsumer = session.createConsumer(queue);comsumer.setMessageListener(new MessageListener(){@Overridepublic void onMessage(Message m) {TextMessage textMsg = (TextMessage) m;try {System.out.println(textMsg.getText());} catch (JMSException e) {e.printStackTrace();}}});?
深入掌握J(rèn)MS(四):實(shí)戰(zhàn)Queue
? Queue實(shí)現(xiàn)的是點(diǎn)到點(diǎn)模型,在下面的例子中,啟動(dòng)2個(gè)消費(fèi)者共同監(jiān)聽一個(gè)Queue,然后循環(huán)給這個(gè)Queue中發(fā)送多個(gè)消息,我們依然采用ActiveMQ。
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue;public class QueueTest {public static void main(String[] args) throws Exception {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");Connection connection = factory.createConnection();connection.start();//創(chuàng)建一個(gè)QueueQueue queue = new ActiveMQQueue("testQueue");//創(chuàng)建一個(gè)SessionSession session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//注冊(cè)消費(fèi)者1MessageConsumer comsumer1 = session.createConsumer(queue);comsumer1.setMessageListener(new MessageListener(){public void onMessage(Message m) {try {System.out.println("Consumer1 get " + ((TextMessage)m).getText());} catch (JMSException e) {e.printStackTrace();}}});//注冊(cè)消費(fèi)者2MessageConsumer comsumer2 = session.createConsumer(queue);comsumer2.setMessageListener(new MessageListener(){public void onMessage(Message m) {try {System.out.println("Consumer2 get " + ((TextMessage)m).getText());} catch (JMSException e) {e.printStackTrace();}}});//創(chuàng)建一個(gè)生產(chǎn)者,然后發(fā)送多個(gè)消息。MessageProducer producer = session.createProducer(queue);for(int i=0; i<10; i++){producer.send(session.createTextMessage("Message:" + i));}} }運(yùn)行這個(gè)例子會(huì)得到下面的輸出結(jié)果:
?
[java]?view plaincopy
? 可以看出每個(gè)消息直被消費(fèi)了一次,但是如果有多個(gè)消費(fèi)者同時(shí)監(jiān)聽一個(gè)Queue的話,無法確定一個(gè)消息最終會(huì)被哪一個(gè)消費(fèi)者消費(fèi)。
?
深入掌握J(rèn)MS(五):實(shí)戰(zhàn)Topic
?
??? 與Queue不同的是,Topic實(shí)現(xiàn)的是發(fā)布/訂閱模型,在下面的例子中,啟動(dòng)2個(gè)消費(fèi)者共同監(jiān)聽一個(gè)Topic,然后循環(huán)給這個(gè)Topic中發(fā)送多個(gè)消息。
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQTopic;public class TopicTest {public static void main(String[] args) throws Exception {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");Connection connection = factory.createConnection();connection.start();//創(chuàng)建一個(gè)TopicTopic topic= new ActiveMQTopic("testTopic");Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//注冊(cè)消費(fèi)者1MessageConsumer comsumer1 = session.createConsumer(topic);comsumer1.setMessageListener(new MessageListener(){public void onMessage(Message m) {try {System.out.println("Consumer1 get " + ((TextMessage)m).getText());} catch (JMSException e) {e.printStackTrace();}}});//注冊(cè)消費(fèi)者2MessageConsumer comsumer2 = session.createConsumer(topic);comsumer2.setMessageListener(new MessageListener(){public void onMessage(Message m) {try {System.out.println("Consumer2 get " + ((TextMessage)m).getText());} catch (JMSException e) {e.printStackTrace();}}});//創(chuàng)建一個(gè)生產(chǎn)者,然后發(fā)送多個(gè)消息。MessageProducer producer = session.createProducer(topic);for(int i=0; i<10; i++){producer.send(session.createTextMessage("Message:" + i));}} }運(yùn)行后得到下面的輸出結(jié)果:
[java]?view plaincopy
說明每一個(gè)消息都會(huì)被所有的消費(fèi)者消費(fèi)。
?
深入掌握J(rèn)MS(六):消息頭
??? 一個(gè)消息對(duì)象分為三部分:消息頭(Headers),屬性(Properties)和消息體(Payload)。對(duì)于StreamMessage和MapMessage,消息本身就有特定的結(jié)構(gòu),而對(duì)于TextMessage,ObjectMessage和BytesMessage是無結(jié)構(gòu)的。一個(gè)消息可以包含一些重要的數(shù)據(jù)或者僅僅是一個(gè)事件的通知。
??? 消息的Headers部分通常包含一些消息的描述信息,它們都是標(biāo)準(zhǔn)的描述信息。包含下面一些值:
JMSDestination?
?????? 消息的目的地,Topic或者是Queue。
JMSDeliveryMode?
??????? 消息的發(fā)送模式:persistent或nonpersistent。前者表示消息在被消費(fèi)之前,如果JMS提供者DOWN了,重新啟動(dòng)后消息仍然存在。后者在這種情況下表示消息會(huì)被丟失??梢酝ㄟ^下面的方式設(shè)置:
?????? Producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
???????JMSTimestamp?
?????? 當(dāng)調(diào)用send()方法的時(shí)候,JMSTimestamp會(huì)被自動(dòng)設(shè)置為當(dāng)前事件??梢酝ㄟ^下面方式得到這個(gè)值:
?????? long timestamp = message.getJMSTimestamp();
JMSExpiration?
?????? 表示一個(gè)消息的有效期。只有在這個(gè)有效期內(nèi),消息消費(fèi)者才可以消費(fèi)這個(gè)消息。默認(rèn)值為0,表示消息永不過期??梢酝ㄟ^下面的方式設(shè)置:
?????? producer.setTimeToLive(3600000); //有效期1小時(shí) (1000毫秒 * 60秒 * 60分)
JMSPriority?
?????? 消息的優(yōu)先級(jí)。0-4為正常的優(yōu)先級(jí),5-9為高優(yōu)先級(jí)。可以通過下面方式設(shè)置:
?????? producer.setPriority(9);
JMSMessageID?
?????? 一個(gè)字符串用來唯一標(biāo)示一個(gè)消息。
JMSReplyTo?
?????? 有時(shí)消息生產(chǎn)者希望消費(fèi)者回復(fù)一個(gè)消息,JMSReplyTo為一個(gè)Destination,表示需要回復(fù)的目的地。當(dāng)然消費(fèi)者可以不理會(huì)它。
JMSCorrelationID?
?????? 通常用來關(guān)聯(lián)多個(gè)Message。例如需要回復(fù)一個(gè)消息,可以把JMSCorrelationID設(shè)置為所收到的消息的JMSMessageID。
JMSType?
?????? 表示消息體的結(jié)構(gòu),和JMS提供者有關(guān)。
JMSRedelivered?
?????? 如果這個(gè)值為true,表示消息是被重新發(fā)送了。因?yàn)橛袝r(shí)消費(fèi)者沒有確認(rèn)他已經(jīng)收到消息或者JMS提供者不確定消費(fèi)者是否已經(jīng)收到。
??? 除了Header,消息發(fā)送者可以添加一些屬性(Properties)。這些屬性可以是應(yīng)用自定義的屬性,JMS定義的屬性和JMS提供者定義的屬性。我們通常只適用自定義的屬性。
??? 后面會(huì)講到這些Header和屬性的用法。
深入掌握J(rèn)MS(七):DeliveryMode例子
?
在下面的例子中,分別發(fā)送一個(gè)Persistent和nonpersistent的消息,然后關(guān)閉退出JMS。
import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class DeliveryModeSendTest {public static void main(String[] args) throws Exception {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");Connection connection = factory.createConnection();connection.start();Queue queue = new ActiveMQQueue("testQueue");Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);MessageProducer producer = session.createProducer(queue);producer.setDeliveryMode(DeliveryMode.PERSISTENT);producer.send(session.createTextMessage("A persistent Message"));producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);producer.send(session.createTextMessage("A non persistent Message"));System.out.println("Send messages sucessfully!");} }?運(yùn)行上面的程序,當(dāng)輸出“Send messages sucessfully!”時(shí),說明兩個(gè)消息都已經(jīng)發(fā)送成功,然后我們結(jié)束它,來停止JMS Provider。
??? 接下來我們重新啟動(dòng)JMS Provicer,然后添加一個(gè)消費(fèi)者:
運(yùn)行上面的程序,可以得到下面的輸出結(jié)果:
Consumer get A persistent Message
可以看出消息消費(fèi)者只接收到一個(gè)消息,它是一個(gè)Persistent的消息。而剛才發(fā)送的non persistent消息已經(jīng)丟失了。
另外, 如果發(fā)送一個(gè)non persistent消息, 而剛好這個(gè)時(shí)候沒有消費(fèi)者在監(jiān)聽, 這個(gè)消息也會(huì)丟失.
?
?
?
深入掌握J(rèn)MS(八):JMSReplyTo
?
?
??? 在下面的例子中,首先創(chuàng)建兩個(gè)Queue,發(fā)送者給一個(gè)Queue發(fā)送,接收者接收到消息之后給另一個(gè)Queue回復(fù)一個(gè)Message,然后再創(chuàng)建一個(gè)消費(fèi)者來接受所回復(fù)的消息。
import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.command.ActiveMQQueue; public class MessageSendReceiveAndReply {public static void main(String[] args) throws Exception {ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");Connection connection = factory.createConnection();connection.start();//消息發(fā)送到這個(gè)QueueQueue queue = new ActiveMQQueue("testQueue");//消息回復(fù)到這個(gè)QueueQueue replyQueue = new ActiveMQQueue("replyQueue");final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創(chuàng)建一個(gè)消息,并設(shè)置它的JMSReplyTo為replyQueue。Message message = session.createTextMessage("Andy");message.setJMSReplyTo(replyQueue);MessageProducer producer = session.createProducer(queue);producer.send(message);//消息的接收者MessageConsumer comsumer = session.createConsumer(queue);comsumer.setMessageListener(new MessageListener(){public void onMessage(Message m) {try {//創(chuàng)建一個(gè)新的MessageProducer來發(fā)送一個(gè)回復(fù)消息。MessageProducer producer = session.createProducer(m.getJMSReplyTo());producer.send(session.createTextMessage("Hello " + ((TextMessage) m).getText()));} catch (JMSException e1) {e1.printStackTrace();}}});//這個(gè)接收者用來接收回復(fù)的消息MessageConsumer comsumer2 = session.createConsumer(replyQueue);comsumer2.setMessageListener(new MessageListener(){public void onMessage(Message m) {try {System.out.println(((TextMessage) m).getText());} catch (JMSException e) {e.printStackTrace();}}});} }首先消息生產(chǎn)者發(fā)送一個(gè)消息,內(nèi)容為“Andy”, 然后消費(fèi)者收到這個(gè)消息之后根據(jù)消息的JMSReplyTo,回復(fù)一個(gè)消息,內(nèi)容為“Hello Andy‘。 最后在回復(fù)的Queue上創(chuàng)建一個(gè)接收回復(fù)消息的消費(fèi)者,它輸出所回復(fù)的內(nèi)容。
??? 運(yùn)行上面的程序,可以得到下面的輸出結(jié)果:
?
??
原創(chuàng)的地址找不到,只找到轉(zhuǎn)載的地址:http://blog.csdn.net/zhangxs_3/article/details/4034713
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/3821906.html
總結(jié)
以上是生活随笔為你收集整理的深入掌握JMS--转的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Realm Configuration
- 下一篇: JTA 深度历险 - 原理与实现---转