publish-subscribe
? ? ?發布訂閱模式有點類似于我們日常生活中訂閱報紙。每年到年尾的時候,郵局就會發一本報紙集合讓我們來選擇訂閱哪一個。在這個表里頭列了所有出版發行的報紙,那么對于我們每一個訂閱者來說,我們可以選擇一份或者多份報紙。比如北京日報、瀟湘晨報等。那么這些個我們訂閱的報紙,就相當于發布訂閱模式里的topic。有很多個人訂閱報紙,也有人可能和我訂閱了相同的報紙。那么,在這里,相當于我們在同一個topic里注冊了。對于一份報紙發行方來說,它和所有的訂閱者就構成了一個1對多的關系。這種關系如下圖所示:
? ? ?現在,假定我們用前面討論的場景來寫一個簡單的示例。我們首先需要定義的是publisher.
publisher
? ? ?publisher是屬于發布信息的一方,它通過定義一個或者多個topic,然后給這些topic發送消息。
? ? publisher的構造函數如下:
public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); }
[java]?view plain
?copypublic?Publisher()?throws?JMSException?{??????????factory?=?new?ActiveMQConnectionFactory(brokerURL);??????????connection?=?factory.createConnection();??????????try?{??????????connection.start();??????????}?catch?(JMSException?jmse)?{??????????????connection.close();??????????????throw?jmse;??????????}??????????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????????producer?=?session.createProducer(null);??????}??
我們按照前面說的流程定義了基本的connectionFactory, connection, session, producer。這里代碼就是主要實現初始化的效果。
? ? 接著,我們需要定義一系列的topic讓所有的consumer來訂閱,設置topic的代碼如下:
[java]?view plain
?copyprotected?void?setTopics(String[]?stocks)?throws?JMSException?{??????destinations?=?new?Destination[stocks.length];??????for(int?i?=?0;?i?<?stocks.length;?i++)?{??????????destinations[i]?=?session.createTopic("STOCKS."?+?stocks[i]);??????}??}??
? ? ?這里destinations是一個內部定義的成員變量Destination[]。這里我們總共定義了的topic數取決于給定的參數stocks。
? ? ?在定義好topic之后我們要給這些指定的topic發消息,具體實現的代碼如下:
protected?void?sendMessage(String[]?stocks)?throws?JMSException?{??????for(int?i?=?0;?i?<?stocks.length;?i++)?{??????????Message?message?=?createStockMessage(stocks[i],?session);??????????System.out.println("Sending:?"?+?((ActiveMQMapMessage)message).getContentMap()?+?"?on?destination:?"?+?destinations[i]);??????????producer.send(destinations[i],?message);??????}??}????protected?Message?createStockMessage(String?stock,?Session?session)?throws?JMSException?{??????MapMessage?message?=?session.createMapMessage();??????message.setString("stock",?stock);??????message.setDouble("price",?1.00);??????message.setDouble("offer",?0.01);??????message.setBoolean("up",?true);????????????????return?message;??}
[java]?view plain
?copyprotected?void?sendMessage(String[]?stocks)?throws?JMSException?{??????for(int?i?=?0;?i?<?stocks.length;?i++)?{??????????Message?message?=?createStockMessage(stocks[i],?session);??????????System.out.println("Sending:?"?+?((ActiveMQMapMessage)message).getContentMap()?+?"?on?destination:?"?+?destinations[i]);??????????producer.send(destinations[i],?message);??????}??}????protected?Message?createStockMessage(String?stock,?Session?session)?throws?JMSException?{??????MapMessage?message?=?session.createMapMessage();??????message.setString("stock",?stock);??????message.setDouble("price",?1.00);??????message.setDouble("offer",?0.01);??????message.setBoolean("up",?true);????????????????return?message;??}??
? ? ?前面的代碼很簡單,在sendMessage方法里我們遍歷每個topic,然后給每個topic發送定義的Message消息。
? ? 在定義好前面發送消息的基礎之后,我們調用他們的代碼就很簡單了:
[java]?view plain
?copypublic?static?void?main(String[]?args)?throws?JMSException?{??????if(args.length?<?1)??????????throw?new?IllegalArgumentException();??????????????????????????Publisher?publisher?=?new?Publisher();??????????????????????????publisher.setTopics(args);????????????????for(int?i?=?0;?i?<?10;?i++)?{??????????publisher.sendMessage(args);??????????System.out.println("Publisher?'"?+?i?+?"?price?messages");??????????try?{??????????????Thread.sleep(1000);??????????}?catch(InterruptedException?e)?{??????????????e.printStackTrace();??????????}??????}????????????publisher.close();??}??
? ? ?調用他們的代碼就是我們遍歷所有topic,然后通過sendMessage發送消息。在發送一個消息之后先sleep1秒鐘。要注意的一個地方就是我們使用完資源之后必須要使用close方法將這些資源關閉釋放。close方法關閉資源的具體實現如下:
[java]?view plain
?copypublic?void?close()?throws?JMSException?{??????if?(connection?!=?null)?{??????????connection.close();???????}??}??
consumer
? ? Consumer的代碼也很類似,具體的步驟無非就是1.初始化資源。 2. 接收消息。 3. 必要的時候關閉資源。
? ? 初始化資源可以放到構造函數里面:
[java]?view plain
?copypublic?Consumer()?throws?JMSException?{??????????factory?=?new?ActiveMQConnectionFactory(brokerURL);??????????connection?=?factory.createConnection();??????????connection.start();??????????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????}??
? ? ?接收和處理消息的方法有兩種,分為同步和異步的,一般同步的方式我們是通過MessageConsumer.receive()方法來處理接收到的消息。而異步的方法則是通過注冊一個MessageListener的方法,使用MessageConsumer.setMessageListener()。這里我們采用異步的方式實現:
[java]?view plain
?copypublic?static?void?main(String[]?args)?throws?JMSException?{??????Consumer?consumer?=?new?Consumer();??????for?(String?stock?:?args)?{??????Destination?destination?=?consumer.getSession().createTopic("STOCKS."?+?stock);??????MessageConsumer?messageConsumer?=?consumer.getSession().createConsumer(destination);??????messageConsumer.setMessageListener(new?Listener());??????}??}????????public?Session?getSession()?{??????return?session;??}??
? ? ?在前面的代碼里我們先找到同樣的topic,然后遍歷所有的topic去獲得消息。對于消息的處理我們專門通過Listener對象來負責。
? ? Listener對象的職責很簡單,主要就是處理接收到的消息:
[java]?view plain
?copypublic?class?Listener?implements?MessageListener?{????????public?void?onMessage(Message?message)?{??????????try?{??????????????MapMessage?map?=?(MapMessage)message;??????????????String?stock?=?map.getString("stock");??????????????double?price?=?map.getDouble("price");??????????????double?offer?=?map.getDouble("offer");??????????????boolean?up?=?map.getBoolean("up");??????????????DecimalFormat?df?=?new?DecimalFormat(?"#,###,###,##0.00"?);??????????????System.out.println(stock?+?"\t"?+?df.format(price)?+?"\t"?+?df.format(offer)?+?"\t"?+?(up?"up":"down"));??????????}?catch?(Exception?e)?{??????????????e.printStackTrace();??????????}??????}????}??
? ? 它實現了MessageListener接口,里面的onMessage方法就是在接收到消息之后會被調用的方法。
? ? 現在,通過實現前面的publisher和consumer我們已經實現了pub-sub模式的一個實例。仔細回想它的步驟的話,主要就是要兩者設定一個共同的topic,有了這個topic之后他們可以實現一方發消息另外一方接收。另外,為了連接到具體的message server,這里是使用了連接tcp://localhost:16161作為定義ActiveMQConnectionFactory的路徑。在publisher端通過session創建producer,根據指定的參數創建destination,然后將消息和destination作為producer.send()方法的參數發消息。在consumer端也要創建類似的connection, session。通過session得到destination,再通過session.createConsumer(destination)來得到一個MessageConsumer對象。有了這個MessageConsumer我們就可以自行選擇是直接同步的receive消息還是注冊listener了。
p2p
? ? p2p的過程則理解起來更加簡單。它好比是兩個人打電話,這兩個人是獨享這一條通信鏈路的。一方發送消息,另外一方接收,就這么簡單。在實際應用中因為有多個用戶對使用p2p的鏈路,它的通信場景如下圖所示:
? ? 我們再來看看一個p2p的示例:
? ? 在p2p的場景里,相互通信的雙方是通過一個類似于隊列的方式來進行交流。和前面pub-sub的區別在于一個topic有一個發送者和多個接收者,而在p2p里一個queue只有一個發送者和一個接收者。
發送者
? ? 和前面的示例非常相似,我們構造函數里需要初始化的內容基本上差不多:
?
[java]?view plain
?copypublic?Publisher()?throws?JMSException?{??????factory?=?new?ActiveMQConnectionFactory(brokerURL);??????connection?=?factory.createConnection();??????connection.start();??????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????producer?=?session.createProducer(null);??}??
? ? ?發送消息的方法如下:
?
?
[java]?view plain
?copypublic?void?sendMessage()?throws?JMSException?{??????for(int?i?=?0;?i?<?jobs.length;?i++)??????{??????????String?job?=?jobs[i];??????????Destination?destination?=?session.createQueue("JOBS."?+?job);??????????Message?message?=?session.createObjectMessage(i);??????????System.out.println("Sending:?id:?"?+?((ObjectMessage)message).getObject()?+?"?on?queue:?"?+?destination);??????????producer.send(destination,?message);??????}??}??
? ? ?這里我們定義了一個jobs的數組,通過遍歷這個數組來創建不同的job queue。這樣就相當于建立了多個點對點通信的鏈路。
?
? ? 消息發送者的啟動代碼如下:
?
[java]?view plain
?copypublic?static?void?main(String[]?args)?throws?JMSException?{??????Publisher?publisher?=?new?Publisher();??????for(int?i?=?0;?i?<?10;?i++)?{??????????publisher.sendMessage();??????????System.out.println("Published?"?+?i?+?"?job?messages");??????try?{??????????????Thread.sleep(1000);??????????}?catch?(InterruptedException?x)?{??????????e.printStackTrace();??????????}??????}??????publisher.close();??}??
? ? ?我們在這里發送10條消息,當然,在每個sendMessage的方法里實際上是針對每個queue發送了10條。
?
?
接收者
? ? ?接收者的代碼很簡單,一個構造函數初始化所有的資源:
?
[java]?view plain
?copypublic?Consumer()?throws?JMSException?{??????????factory?=?new?ActiveMQConnectionFactory(brokerURL);??????????connection?=?factory.createConnection();??????????connection.start();??????????session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);??????}??
? ? 還有一個就是注冊消息處理的對象:
?
?
[java]?view plain
?copypublic?static?void?main(String[]?args)?throws?JMSException?{??????????Consumer?consumer?=?new?Consumer();??????????for?(String?job?:?consumer.jobs)?{??????????????Destination?destination?=?consumer.getSession().createQueue("JOBS."?+?job);??????????????MessageConsumer?messageConsumer?=?consumer.getSession().createConsumer(destination);??????????????messageConsumer.setMessageListener(new?Listener(job));??????????}??????}????????????public?Session?getSession()?{??????????return?session;??????}??
? ? ?具體注冊的對象處理方法和前面還是類似,實現MessageListener接口就可以了。
?
[java]?view plain
?copyimport?javax.jms.Message;??import?javax.jms.MessageListener;??import?javax.jms.ObjectMessage;????public?class?Listener?implements?MessageListener?{????????private?String?job;????????????public?Listener(String?job)?{??????????this.job?=?job;??????}????????public?void?onMessage(Message?message)?{??????????try?{????????????????????????????System.out.println(job?+?"?id:"?+?((ObjectMessage)message).getObject());??????????}?catch?(Exception?e)?{??????????????e.printStackTrace();??????????}??????}????}??? ? ?這里代碼和前面pub-sub的具體實現代碼非常相似,就不再贅述。
?
? ? ?現在如果我們比較一下pub-sub和p2p模式的具體實現步驟的話,我們會發現他們基本的處理流程都是類似的,除了在pub-sub中要通過createTopic來設置topic,而在p2p中要通過createQueue來創建通信隊列。他們之間存在著很多的重復之處,在具體的開發過程中,我們是否可以進行一些工程上的優化呢?別急,后面我們會討論到的。
request-response
? ? 和前面兩種方式比較起來,request-response的通信方式很常見,但是不是默認提供的一種模式。在前面的兩種模式中都是一方負責發送消息而另外一方負責處理。而我們實際中的很多應用相當于一種一應一答的過程,需要雙方都能給對方發送消息。于是請求-應答的這種通信方式也很重要。它也應用的很普遍。?
? ? ?請求-應答方式并不是JMS規范系統默認提供的一種通信方式,而是通過在現有通信方式的基礎上稍微運用一點技巧實現的。下圖是典型的請求-應答方式的交互過程:
? ? ?在JMS里面,如果要實現請求/應答的方式,可以利用JMSReplyTo和JMSCorrelationID消息頭來將通信的雙方關聯起來。另外,QueueRequestor和TopicRequestor能夠支持簡單的請求/應答過程。
? ? 現在,如果我們要實現這么一個過程,在發送請求消息并且等待返回結果的client端的流程如下:?
[java]?view plain
?copy??Destination?tempDest?=?session.createTemporaryQueue();??MessageConsumer?responseConsumer?=?session.createConsumer(tempDest);??...??????message.setJMSReplyTo(tempDest)??message.setJMSCorrelationID(myCorrelationID);????producer.send(message);??? ? ?client端創建一個臨時隊列并在發送的消息里指定了發送返回消息的destination以及correlationID。那么在處理消息的server端得到這個消息后就知道該發送給誰了。Server端的大致流程如下:
?
[java]?view plain
?copypublic?void?onMessage(Message?request)?{??????Message?response?=?session.createMessage();????response.setJMSCorrelationID(request.getJMSCorrelationID())??????producer.send(request.getJMSReplyTo(),?response)??}??? ? 這里我們是用server端注冊MessageListener,通過設置返回信息的CorrelationID和JMSReplyTo將信息返回。
? ? 以上就是發送和接收消息的雙方的大致程序結構。具體的實現代碼如下:
?Client:
[java]?view plain
?copypublic?Client()?{??????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory("tcp://localhost:61616");??????????Connection?connection;??????????try?{??????????????connection?=?connectionFactory.createConnection();??????????????connection.start();??????????????Session?session?=?connection.createSession(transacted,?ackMode);??????????????Destination?adminQueue?=?session.createQueue(clientQueueName);??????????????????????????????this.producer?=?session.createProducer(adminQueue);??????????????this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);??????????????????????????????????????????????????????????Destination?tempDest?=?session.createTemporaryQueue();??????????????MessageConsumer?responseConsumer?=?session.createConsumer(tempDest);??????????????????????????????responseConsumer.setMessageListener(this);??????????????????????????????TextMessage?txtMessage?=?session.createTextMessage();??????????????txtMessage.setText("MyProtocolMessage");????????????????????????????????????????????txtMessage.setJMSReplyTo(tempDest);??????????????????????????????????????????????????????????????????????????????????????String?correlationId?=?this.createRandomString();??????????????txtMessage.setJMSCorrelationID(correlationId);??????????????this.producer.send(txtMessage);??????????}?catch?(JMSException?e)?{????????????????????????}??????}??? ? 這里的代碼除了初始化構造函數里的參數還同時設置了兩個destination,一個是自己要發送消息出去的destination,在session.createProducer(adminQueue);這一句設置。另外一個是自己要接收的消息destination, 通過Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest); 這兩句指定了要接收消息的目的地。這里是用的一個臨時隊列。在前面指定了返回消息的通信隊列之后,我們需要通知server端知道發送返回消息給哪個隊列。于是txtMessage.setJMSReplyTo(tempDest);指定了這一部分,同時txtMessage.setJMSCorrelationID(correlationId);方法主要是為了保證每次發送回來請求的server端能夠知道對應的是哪個請求。這里一個請求和一個應答是相當于對應一個相同的序列號一樣。
? ? 同時,因為client端在發送消息之后還要接收server端返回的消息,所以它也要實現一個消息receiver的功能。這里采用實現MessageListener接口的方式:
[java]?view plain
?copypublic?void?onMessage(Message?message)?{??????????String?messageText?=?null;??????????try?{??????????????if?(message?instanceof?TextMessage)?{??????????????????TextMessage?textMessage?=?(TextMessage)?message;??????????????????messageText?=?textMessage.getText();??????????????????System.out.println("messageText?=?"?+?messageText);??????????????}??????????}?catch?(JMSException?e)?{????????????????????????}??????}???
Server:
? ? ?這里server端要執行的過程和client端相反,它是先接收消息,在接收到消息后根據提供的JMSCorelationID來發送返回的消息:
[java]?view plain
?copypublic?void?onMessage(Message?message)?{??????????try?{??????????????TextMessage?response?=?this.session.createTextMessage();??????????????if?(message?instanceof?TextMessage)?{??????????????????TextMessage?txtMsg?=?(TextMessage)?message;??????????????????String?messageText?=?txtMsg.getText();??????????????????response.setText(this.messageProtocol.handleProtocolMessage(messageText));??????????????}??????????????????????????????????????????????????????????response.setJMSCorrelationID(message.getJMSCorrelationID());????????????????????????????????????????????this.replyProducer.send(message.getJMSReplyTo(),?response);??????????}?catch?(JMSException?e)?{????????????????????????}??????}??? ? 前面,在replyProducer.send()方法里,message.getJMSReplyTo()就得到了要發送消息回去的destination。
? ? 另外,設置這些發送返回信息的replyProducer的信息主要在構造函數相關的方法里實現了:
[java]?view plain
?copypublic?Server()?{??????????try?{????????????????????????????BrokerService?broker?=?new?BrokerService();??????????????broker.setPersistent(false);??????????????broker.setUseJmx(false);??????????????broker.addConnector(messageBrokerUrl);??????????????broker.start();??????????}?catch?(Exception?e)?{????????????????????????}????????????????????????????????this.messageProtocol?=?new?MessageProtocol();??????????this.setupMessageQueueConsumer();??????}????????private?void?setupMessageQueueConsumer()?{??????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory(messageBrokerUrl);??????????Connection?connection;??????????try?{??????????????connection?=?connectionFactory.createConnection();??????????????connection.start();??????????????this.session?=?connection.createSession(this.transacted,?ackMode);??????????????Destination?adminQueue?=?this.session.createQueue(messageQueueName);????????????????????????????????????????????this.replyProducer?=?this.session.createProducer(null);??????????????this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);??????????????????????????????MessageConsumer?consumer?=?this.session.createConsumer(adminQueue);??????????????consumer.setMessageListener(this);??????????}?catch?(JMSException?e)?{????????????????????????}??????}??? ? 總體來說,整個的交互過程并不復雜,只是比較繁瑣。對于請求/應答的方式來說,這種典型交互的過程就是Client端在設定正常發送請求的Queue同時也設定一個臨時的Queue。同時在要發送的message里頭指定要返回消息的destination以及CorelationID,這些就好比是一封信里面所帶的回執。根據這個信息人家才知道怎么給你回信。對于Server端來說則要額外創建一個producer,在處理接收到消息的方法里再利用producer將消息發回去。這一系列的過程看起來很像http協議里面請求-應答的方式,都是一問一答。
一些應用和改進
? ? 回顧前面三種基本的通信方式,我們會發現,他們都存在著一定的共同點,比如說都要初始化ConnectionFactory, Connection, Session等。在使用完之后都要將這些資源關閉。如果每一個實現它的通信端都這么寫一通的話,其實是一種簡單的重復。從工程的角度來看是完全沒有必要的。那么,我們有什么辦法可以減少這種重復呢?
? ? 一種簡單的方式就是通過工廠方法封裝這些對象的創建和銷毀,然后簡單的通過調用工廠方法的方式得到他們。另外,既然基本的流程都是在開頭創建資源在結尾銷毀,我們也可以采用Template Method模式的思路。通過繼承一個抽象類,在抽象類里提供了資源的封裝。所有繼承的類只要實現怎么去使用這些資源的方法就可以了。Spring中間的JMSTemplate就提供了這種類似思想的封裝。具體的實現可以參考這篇文章。
總結
? ? ?activemq默認提供了pub-sub, p2p這兩種通信的方式。同時也提供了一些對request-response方式的支持。實際上,不僅僅是activemq,對于所有其他實現JMS規范的產品都能夠提供類似的功能。這里每種方式都不太復雜,主要是創建和管理資源的步驟顯得比較繁瑣。
總結
以上是生活随笔為你收集整理的activeMQ的三种通讯模式的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。