日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

activeMQ的三种通讯模式

發布時間:2024/4/15 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 activeMQ的三种通讯模式 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

publish-subscribe

? ? ?發布訂閱模式有點類似于我們日常生活中訂閱報紙。每年到年尾的時候,郵局就會發一本報紙集合讓我們來選擇訂閱哪一個。在這個表里頭列了所有出版發行的報紙,那么對于我們每一個訂閱者來說,我們可以選擇一份或者多份報紙。比如北京日報、瀟湘晨報等。那么這些個我們訂閱的報紙,就相當于發布訂閱模式里的topic。有很多個人訂閱報紙,也有人可能和我訂閱了相同的報紙。那么,在這里,相當于我們在同一個topic里注冊了。對于一份報紙發行方來說,它和所有的訂閱者就構成了一個1對多的關系。這種關系如下圖所示:

? ? ?現在,假定我們用前面討論的場景來寫一個簡單的示例。我們首先需要定義的是publisher.

publisher

? ? ?publisher是屬于發布信息的一方,它通過定義一個或者多個topic,然后給這些topic發送消息。

? ? publisher的構造函數如下:

public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); }

[java]?view plain?copy
  • public?Publisher()?throws?JMSException?{??
  • ????????factory?=?new?ActiveMQConnectionFactory(brokerURL);??
  • ????????connection?=?factory.createConnection();??
  • ????????try?{??
  • ????????connection.start();??
  • ????????}?catch?(JMSException?jmse)?{??
  • ????????????connection.close();??
  • ????????????throw?jmse;??
  • ????????}??
  • ????????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??
  • ????????producer?=?session.createProducer(null);??
  • ????}??
  • 我們按照前面說的流程定義了基本的connectionFactory, connection, session, producer。這里代碼就是主要實現初始化的效果。

    ? ? 接著,我們需要定義一系列的topic讓所有的consumer來訂閱,設置topic的代碼如下:

    [java]?view plain?copy
  • protected?void?setTopics(String[]?stocks)?throws?JMSException?{??
  • ????destinations?=?new?Destination[stocks.length];??
  • ????for(int?i?=?0;?i?<?stocks.length;?i++)?{??
  • ????????destinations[i]?=?session.createTopic("STOCKS."?+?stocks[i]);??
  • ????}??
  • }??
  • ? ? ?這里destinations是一個內部定義的成員變量Destination[]。這里我們總共定義了的topic數取決于給定的參數stocks。

    ? ? ?在定義好topic之后我們要給這些指定的topic發消息,具體實現的代碼如下:

  • protected?void?sendMessage(String[]?stocks)?throws?JMSException?{??
  • ????for(int?i?=?0;?i?<?stocks.length;?i++)?{??
  • ????????Message?message?=?createStockMessage(stocks[i],?session);??
  • ????????System.out.println("Sending:?"?+?((ActiveMQMapMessage)message).getContentMap()?+?"?on?destination:?"?+?destinations[i]);??
  • ????????producer.send(destinations[i],?message);??
  • ????}??
  • }??
  • ??
  • protected?Message?createStockMessage(String?stock,?Session?session)?throws?JMSException?{??
  • ????MapMessage?message?=?session.createMapMessage();??
  • ????message.setString("stock",?stock);??
  • ????message.setDouble("price",?1.00);??
  • ????message.setDouble("offer",?0.01);??
  • ????message.setBoolean("up",?true);??
  • ??????????
  • ????return?message;??
  • }
  • [java]?view plain?copy
  • protected?void?sendMessage(String[]?stocks)?throws?JMSException?{??
  • ????for(int?i?=?0;?i?<?stocks.length;?i++)?{??
  • ????????Message?message?=?createStockMessage(stocks[i],?session);??
  • ????????System.out.println("Sending:?"?+?((ActiveMQMapMessage)message).getContentMap()?+?"?on?destination:?"?+?destinations[i]);??
  • ????????producer.send(destinations[i],?message);??
  • ????}??
  • }??
  • ??
  • protected?Message?createStockMessage(String?stock,?Session?session)?throws?JMSException?{??
  • ????MapMessage?message?=?session.createMapMessage();??
  • ????message.setString("stock",?stock);??
  • ????message.setDouble("price",?1.00);??
  • ????message.setDouble("offer",?0.01);??
  • ????message.setBoolean("up",?true);??
  • ??????????
  • ????return?message;??
  • }??
  • ? ? ?前面的代碼很簡單,在sendMessage方法里我們遍歷每個topic,然后給每個topic發送定義的Message消息。

    ? ? 在定義好前面發送消息的基礎之后,我們調用他們的代碼就很簡單了:

    [java]?view plain?copy
  • public?static?void?main(String[]?args)?throws?JMSException?{??
  • ????if(args.length?<?1)??
  • ????????throw?new?IllegalArgumentException();??
  • ??????
  • ????????//?Create?publisher???????
  • ????????Publisher?publisher?=?new?Publisher();??
  • ??????????
  • ????????//?Set?topics??
  • ????publisher.setTopics(args);??
  • ??????????
  • ????for(int?i?=?0;?i?<?10;?i++)?{??
  • ????????publisher.sendMessage(args);??
  • ????????System.out.println("Publisher?'"?+?i?+?"?price?messages");??
  • ????????try?{??
  • ????????????Thread.sleep(1000);??
  • ????????}?catch(InterruptedException?e)?{??
  • ????????????e.printStackTrace();??
  • ????????}??
  • ????}??
  • ????//?Close?all?resources??
  • ????publisher.close();??
  • }??
  • ? ? ?調用他們的代碼就是我們遍歷所有topic,然后通過sendMessage發送消息。在發送一個消息之后先sleep1秒鐘。要注意的一個地方就是我們使用完資源之后必須要使用close方法將這些資源關閉釋放。close方法關閉資源的具體實現如下:

    [java]?view plain?copy
  • public?void?close()?throws?JMSException?{??
  • ????if?(connection?!=?null)?{??
  • ????????connection.close();??
  • ?????}??
  • }??

  • consumer

    ? ? Consumer的代碼也很類似,具體的步驟無非就是1.初始化資源。 2. 接收消息。 3. 必要的時候關閉資源。

    ? ? 初始化資源可以放到構造函數里面:

    [java]?view plain?copy
  • public?Consumer()?throws?JMSException?{??
  • ????????factory?=?new?ActiveMQConnectionFactory(brokerURL);??
  • ????????connection?=?factory.createConnection();??
  • ????????connection.start();??
  • ????????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??
  • ????}??
  • ? ? ?接收和處理消息的方法有兩種,分為同步和異步的,一般同步的方式我們是通過MessageConsumer.receive()方法來處理接收到的消息。而異步的方法則是通過注冊一個MessageListener的方法,使用MessageConsumer.setMessageListener()。這里我們采用異步的方式實現:

    [java]?view plain?copy
  • public?static?void?main(String[]?args)?throws?JMSException?{??
  • ????Consumer?consumer?=?new?Consumer();??
  • ????for?(String?stock?:?args)?{??
  • ????Destination?destination?=?consumer.getSession().createTopic("STOCKS."?+?stock);??
  • ????MessageConsumer?messageConsumer?=?consumer.getSession().createConsumer(destination);??
  • ????messageConsumer.setMessageListener(new?Listener());??
  • ????}??
  • }??
  • ??????
  • public?Session?getSession()?{??
  • ????return?session;??
  • }??
  • ? ? ?在前面的代碼里我們先找到同樣的topic,然后遍歷所有的topic去獲得消息。對于消息的處理我們專門通過Listener對象來負責。

    ? ? Listener對象的職責很簡單,主要就是處理接收到的消息:

    [java]?view plain?copy
  • public?class?Listener?implements?MessageListener?{??
  • ??
  • ????public?void?onMessage(Message?message)?{??
  • ????????try?{??
  • ????????????MapMessage?map?=?(MapMessage)message;??
  • ????????????String?stock?=?map.getString("stock");??
  • ????????????double?price?=?map.getDouble("price");??
  • ????????????double?offer?=?map.getDouble("offer");??
  • ????????????boolean?up?=?map.getBoolean("up");??
  • ????????????DecimalFormat?df?=?new?DecimalFormat(?"#,###,###,##0.00"?);??
  • ????????????System.out.println(stock?+?"\t"?+?df.format(price)?+?"\t"?+?df.format(offer)?+?"\t"?+?(up?"up":"down"));??
  • ????????}?catch?(Exception?e)?{??
  • ????????????e.printStackTrace();??
  • ????????}??
  • ????}??
  • ??
  • }??
  • ? ? 它實現了MessageListener接口,里面的onMessage方法就是在接收到消息之后會被調用的方法。

    ? ? 現在,通過實現前面的publisher和consumer我們已經實現了pub-sub模式的一個實例。仔細回想它的步驟的話,主要就是要兩者設定一個共同的topic,有了這個topic之后他們可以實現一方發消息另外一方接收。另外,為了連接到具體的message server,這里是使用了連接tcp://localhost:16161作為定義ActiveMQConnectionFactory的路徑。在publisher端通過session創建producer,根據指定的參數創建destination,然后將消息和destination作為producer.send()方法的參數發消息。在consumer端也要創建類似的connection, session。通過session得到destination,再通過session.createConsumer(destination)來得到一個MessageConsumer對象。有了這個MessageConsumer我們就可以自行選擇是直接同步的receive消息還是注冊listener了。

    p2p

    ? ? p2p的過程則理解起來更加簡單。它好比是兩個人打電話,這兩個人是獨享這一條通信鏈路的。一方發送消息,另外一方接收,就這么簡單。在實際應用中因為有多個用戶對使用p2p的鏈路,它的通信場景如下圖所示:

    ? ? 我們再來看看一個p2p的示例:

    ? ? 在p2p的場景里,相互通信的雙方是通過一個類似于隊列的方式來進行交流。和前面pub-sub的區別在于一個topic有一個發送者和多個接收者,而在p2p里一個queue只有一個發送者和一個接收者。

    發送者

    ? ? 和前面的示例非常相似,我們構造函數里需要初始化的內容基本上差不多:

    ?

    [java]?view plain?copy
  • public?Publisher()?throws?JMSException?{??
  • ????factory?=?new?ActiveMQConnectionFactory(brokerURL);??
  • ????connection?=?factory.createConnection();??
  • ????connection.start();??
  • ????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??
  • ????producer?=?session.createProducer(null);??
  • }??
  • ? ? ?發送消息的方法如下:

    ?

    ?

    [java]?view plain?copy
  • public?void?sendMessage()?throws?JMSException?{??
  • ????for(int?i?=?0;?i?<?jobs.length;?i++)??
  • ????{??
  • ????????String?job?=?jobs[i];??
  • ????????Destination?destination?=?session.createQueue("JOBS."?+?job);??
  • ????????Message?message?=?session.createObjectMessage(i);??
  • ????????System.out.println("Sending:?id:?"?+?((ObjectMessage)message).getObject()?+?"?on?queue:?"?+?destination);??
  • ????????producer.send(destination,?message);??
  • ????}??
  • }??
  • ? ? ?這里我們定義了一個jobs的數組,通過遍歷這個數組來創建不同的job queue。這樣就相當于建立了多個點對點通信的鏈路。

    ?

    ? ? 消息發送者的啟動代碼如下:

    ?

    [java]?view plain?copy
  • public?static?void?main(String[]?args)?throws?JMSException?{??
  • ????Publisher?publisher?=?new?Publisher();??
  • ????for(int?i?=?0;?i?<?10;?i++)?{??
  • ????????publisher.sendMessage();??
  • ????????System.out.println("Published?"?+?i?+?"?job?messages");??
  • ????try?{??
  • ????????????Thread.sleep(1000);??
  • ????????}?catch?(InterruptedException?x)?{??
  • ????????e.printStackTrace();??
  • ????????}??
  • ????}??
  • ????publisher.close();??
  • }??
  • ? ? ?我們在這里發送10條消息,當然,在每個sendMessage的方法里實際上是針對每個queue發送了10條。

    ?

    ?

    接收者

    ? ? ?接收者的代碼很簡單,一個構造函數初始化所有的資源:

    ?

    [java]?view plain?copy
  • public?Consumer()?throws?JMSException?{??
  • ????????factory?=?new?ActiveMQConnectionFactory(brokerURL);??
  • ????????connection?=?factory.createConnection();??
  • ????????connection.start();??
  • ????????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??
  • ????}??
  • ? ? 還有一個就是注冊消息處理的對象:

    ?

    ?

    [java]?view plain?copy
  • public?static?void?main(String[]?args)?throws?JMSException?{??
  • ????????Consumer?consumer?=?new?Consumer();??
  • ????????for?(String?job?:?consumer.jobs)?{??
  • ????????????Destination?destination?=?consumer.getSession().createQueue("JOBS."?+?job);??
  • ????????????MessageConsumer?messageConsumer?=?consumer.getSession().createConsumer(destination);??
  • ????????????messageConsumer.setMessageListener(new?Listener(job));??
  • ????????}??
  • ????}??
  • ??????
  • ????public?Session?getSession()?{??
  • ????????return?session;??
  • ????}??
  • ? ? ?具體注冊的對象處理方法和前面還是類似,實現MessageListener接口就可以了。 ? [java]?view plain?copy
  • import?javax.jms.Message;??
  • import?javax.jms.MessageListener;??
  • import?javax.jms.ObjectMessage;??
  • ??
  • public?class?Listener?implements?MessageListener?{??
  • ??
  • ????private?String?job;??
  • ??????
  • ????public?Listener(String?job)?{??
  • ????????this.job?=?job;??
  • ????}??
  • ??
  • ????public?void?onMessage(Message?message)?{??
  • ????????try?{??
  • ????????????//do?something?here??
  • ????????????System.out.println(job?+?"?id:"?+?((ObjectMessage)message).getObject());??
  • ????????}?catch?(Exception?e)?{??
  • ????????????e.printStackTrace();??
  • ????????}??
  • ????}??
  • ??
  • }??
  • ? ? ?這里代碼和前面pub-sub的具體實現代碼非常相似,就不再贅述。

    ?

    ? ? ?現在如果我們比較一下pub-sub和p2p模式的具體實現步驟的話,我們會發現他們基本的處理流程都是類似的,除了在pub-sub中要通過createTopic來設置topic,而在p2p中要通過createQueue來創建通信隊列。他們之間存在著很多的重復之處,在具體的開發過程中,我們是否可以進行一些工程上的優化呢?別急,后面我們會討論到的。

    request-response

    ? ? 和前面兩種方式比較起來,request-response的通信方式很常見,但是不是默認提供的一種模式。在前面的兩種模式中都是一方負責發送消息而另外一方負責處理。而我們實際中的很多應用相當于一種一應一答的過程,需要雙方都能給對方發送消息。于是請求-應答的這種通信方式也很重要。它也應用的很普遍。?

    ? ? ?請求-應答方式并不是JMS規范系統默認提供的一種通信方式,而是通過在現有通信方式的基礎上稍微運用一點技巧實現的。下圖是典型的請求-應答方式的交互過程:

    ? ? ?在JMS里面,如果要實現請求/應答的方式,可以利用JMSReplyTo和JMSCorrelationID消息頭來將通信的雙方關聯起來。另外,QueueRequestor和TopicRequestor能夠支持簡單的請求/應答過程。

    ? ? 現在,如果我們要實現這么一個過程,在發送請求消息并且等待返回結果的client端的流程如下:?

    [java]?view plain?copy
  • //?client?side??
  • Destination?tempDest?=?session.createTemporaryQueue();??
  • MessageConsumer?responseConsumer?=?session.createConsumer(tempDest);??
  • ...??
  • ??
  • //?send?a?request..??
  • message.setJMSReplyTo(tempDest)??
  • message.setJMSCorrelationID(myCorrelationID);??
  • ??
  • producer.send(message);??
  • ? ? ?client端創建一個臨時隊列并在發送的消息里指定了發送返回消息的destination以及correlationID。那么在處理消息的server端得到這個消息后就知道該發送給誰了。Server端的大致流程如下:

    ?

    [java]?view plain?copy
  • public?void?onMessage(Message?request)?{??
  • ??
  • ??Message?response?=?session.createMessage();??
  • ??response.setJMSCorrelationID(request.getJMSCorrelationID())??
  • ??
  • ??producer.send(request.getJMSReplyTo(),?response)??
  • }??
  • ? ? 這里我們是用server端注冊MessageListener,通過設置返回信息的CorrelationID和JMSReplyTo將信息返回。

    ? ? 以上就是發送和接收消息的雙方的大致程序結構。具體的實現代碼如下:

    ?Client:


    [java]?view plain?copy
  • public?Client()?{??
  • ????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");??
  • ????????Connection?connection;??
  • ????????try?{??
  • ????????????connection?=?connectionFactory.createConnection();??
  • ????????????connection.start();??
  • ????????????Session?session?=?connection.createSession(transacted,?ackMode);??
  • ????????????Destination?adminQueue?=?session.createQueue(clientQueueName);??
  • ??
  • ????????????//Setup?a?message?producer?to?send?message?to?the?queue?the?server?is?consuming?from??
  • ????????????this.producer?=?session.createProducer(adminQueue);??
  • ????????????this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);??
  • ??
  • ????????????//Create?a?temporary?queue?that?this?client?will?listen?for?responses?on?then?create?a?consumer??
  • ????????????//that?consumes?message?from?this?temporary?queue...for?a?real?application?a?client?should?reuse??
  • ????????????//the?same?temp?queue?for?each?message?to?the?server...one?temp?queue?per?client??
  • ????????????Destination?tempDest?=?session.createTemporaryQueue();??
  • ????????????MessageConsumer?responseConsumer?=?session.createConsumer(tempDest);??
  • ??
  • ????????????//This?class?will?handle?the?messages?to?the?temp?queue?as?well??
  • ????????????responseConsumer.setMessageListener(this);??
  • ??
  • ????????????//Now?create?the?actual?message?you?want?to?send??
  • ????????????TextMessage?txtMessage?=?session.createTextMessage();??
  • ????????????txtMessage.setText("MyProtocolMessage");??
  • ??
  • ????????????//Set?the?reply?to?field?to?the?temp?queue?you?created?above,?this?is?the?queue?the?server??
  • ????????????//will?respond?to??
  • ????????????txtMessage.setJMSReplyTo(tempDest);??
  • ??
  • ????????????//Set?a?correlation?ID?so?when?you?get?a?response?you?know?which?sent?message?the?response?is?for??
  • ????????????//If?there?is?never?more?than?one?outstanding?message?to?the?server?then?the??
  • ????????????//same?correlation?ID?can?be?used?for?all?the?messages...if?there?is?more?than?one?outstanding??
  • ????????????//message?to?the?server?you?would?presumably?want?to?associate?the?correlation?ID?with?this??
  • ????????????//message?somehow...a?Map?works?good??
  • ????????????String?correlationId?=?this.createRandomString();??
  • ????????????txtMessage.setJMSCorrelationID(correlationId);??
  • ????????????this.producer.send(txtMessage);??
  • ????????}?catch?(JMSException?e)?{??
  • ????????????//Handle?the?exception?appropriately??
  • ????????}??
  • ????}??
  • ? ? 這里的代碼除了初始化構造函數里的參數還同時設置了兩個destination,一個是自己要發送消息出去的destination,在session.createProducer(adminQueue);這一句設置。另外一個是自己要接收的消息destination, 通過Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 這兩句指定了要接收消息的目的地。這里是用的一個臨時隊列。在前面指定了返回消息的通信隊列之后,我們需要通知server端知道發送返回消息給哪個隊列。于是txtMessage.setJMSReplyTo(tempDest);指定了這一部分,同時txtMessage.setJMSCorrelationID(correlationId);方法主要是為了保證每次發送回來請求的server端能夠知道對應的是哪個請求。這里一個請求和一個應答是相當于對應一個相同的序列號一樣。

    ? ? 同時,因為client端在發送消息之后還要接收server端返回的消息,所以它也要實現一個消息receiver的功能。這里采用實現MessageListener接口的方式:

    [java]?view plain?copy
  • public?void?onMessage(Message?message)?{??
  • ????????String?messageText?=?null;??
  • ????????try?{??
  • ????????????if?(message?instanceof?TextMessage)?{??
  • ????????????????TextMessage?textMessage?=?(TextMessage)?message;??
  • ????????????????messageText?=?textMessage.getText();??
  • ????????????????System.out.println("messageText?=?"?+?messageText);??
  • ????????????}??
  • ????????}?catch?(JMSException?e)?{??
  • ????????????//Handle?the?exception?appropriately??
  • ????????}??
  • ????}??
  • ?

    Server:

    ? ? ?這里server端要執行的過程和client端相反,它是先接收消息,在接收到消息后根據提供的JMSCorelationID來發送返回的消息:

    [java]?view plain?copy
  • public?void?onMessage(Message?message)?{??
  • ????????try?{??
  • ????????????TextMessage?response?=?this.session.createTextMessage();??
  • ????????????if?(message?instanceof?TextMessage)?{??
  • ????????????????TextMessage?txtMsg?=?(TextMessage)?message;??
  • ????????????????String?messageText?=?txtMsg.getText();??
  • ????????????????response.setText(this.messageProtocol.handleProtocolMessage(messageText));??
  • ????????????}??
  • ??
  • ????????????//Set?the?correlation?ID?from?the?received?message?to?be?the?correlation?id?of?the?response?message??
  • ????????????//this?lets?the?client?identify?which?message?this?is?a?response?to?if?it?has?more?than??
  • ????????????//one?outstanding?message?to?the?server??
  • ????????????response.setJMSCorrelationID(message.getJMSCorrelationID());??
  • ??
  • ????????????//Send?the?response?to?the?Destination?specified?by?the?JMSReplyTo?field?of?the?received?message,??
  • ????????????//this?is?presumably?a?temporary?queue?created?by?the?client??
  • ????????????this.replyProducer.send(message.getJMSReplyTo(),?response);??
  • ????????}?catch?(JMSException?e)?{??
  • ????????????//Handle?the?exception?appropriately??
  • ????????}??
  • ????}??
  • ? ? 前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要發送消息回去的destination。

    ? ? 另外,設置這些發送返回信息的replyProducer的信息主要在構造函數相關的方法里實現了:

    [java]?view plain?copy
  • public?Server()?{??
  • ????????try?{??
  • ????????????//This?message?broker?is?embedded??
  • ????????????BrokerService?broker?=?new?BrokerService();??
  • ????????????broker.setPersistent(false);??
  • ????????????broker.setUseJmx(false);??
  • ????????????broker.addConnector(messageBrokerUrl);??
  • ????????????broker.start();??
  • ????????}?catch?(Exception?e)?{??
  • ????????????//Handle?the?exception?appropriately??
  • ????????}??
  • ??
  • ????????//Delegating?the?handling?of?messages?to?another?class,?instantiate?it?before?setting?up?JMS?so?it??
  • ????????//is?ready?to?handle?messages??
  • ????????this.messageProtocol?=?new?MessageProtocol();??
  • ????????this.setupMessageQueueConsumer();??
  • ????}??
  • ??
  • ????private?void?setupMessageQueueConsumer()?{??
  • ????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory(messageBrokerUrl);??
  • ????????Connection?connection;??
  • ????????try?{??
  • ????????????connection?=?connectionFactory.createConnection();??
  • ????????????connection.start();??
  • ????????????this.session?=?connection.createSession(this.transacted,?ackMode);??
  • ????????????Destination?adminQueue?=?this.session.createQueue(messageQueueName);??
  • ??
  • ????????????//Setup?a?message?producer?to?respond?to?messages?from?clients,?we?will?get?the?destination??
  • ????????????//to?send?to?from?the?JMSReplyTo?header?field?from?a?Message??
  • ????????????this.replyProducer?=?this.session.createProducer(null);??
  • ????????????this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);??
  • ??
  • ????????????//Set?up?a?consumer?to?consume?messages?off?of?the?admin?queue??
  • ????????????MessageConsumer?consumer?=?this.session.createConsumer(adminQueue);??
  • ????????????consumer.setMessageListener(this);??
  • ????????}?catch?(JMSException?e)?{??
  • ????????????//Handle?the?exception?appropriately??
  • ????????}??
  • ????}??
  • ? ? 總體來說,整個的交互過程并不復雜,只是比較繁瑣。對于請求/應答的方式來說,這種典型交互的過程就是Client端在設定正常發送請求的Queue同時也設定一個臨時的Queue。同時在要發送的message里頭指定要返回消息的destination以及CorelationID,這些就好比是一封信里面所帶的回執。根據這個信息人家才知道怎么給你回信。對于Server端來說則要額外創建一個producer,在處理接收到消息的方法里再利用producer將消息發回去。這一系列的過程看起來很像http協議里面請求-應答的方式,都是一問一答。

    一些應用和改進

    ? ? 回顧前面三種基本的通信方式,我們會發現,他們都存在著一定的共同點,比如說都要初始化ConnectionFactory, Connection, Session等。在使用完之后都要將這些資源關閉。如果每一個實現它的通信端都這么寫一通的話,其實是一種簡單的重復。從工程的角度來看是完全沒有必要的。那么,我們有什么辦法可以減少這種重復呢?

    ? ? 一種簡單的方式就是通過工廠方法封裝這些對象的創建和銷毀,然后簡單的通過調用工廠方法的方式得到他們。另外,既然基本的流程都是在開頭創建資源在結尾銷毀,我們也可以采用Template Method模式的思路。通過繼承一個抽象類,在抽象類里提供了資源的封裝。所有繼承的類只要實現怎么去使用這些資源的方法就可以了。Spring中間的JMSTemplate就提供了這種類似思想的封裝。具體的實現可以參考這篇文章。

    總結

    ? ? ?activemq默認提供了pub-sub, p2p這兩種通信的方式。同時也提供了一些對request-response方式的支持。實際上,不僅僅是activemq,對于所有其他實現JMS規范的產品都能夠提供類似的功能。這里每種方式都不太復雜,主要是創建和管理資源的步驟顯得比較繁瑣。

    總結

    以上是生活随笔為你收集整理的activeMQ的三种通讯模式的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。