日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ActiveMQ入门系列二:入门代码实例(点对点模式)

發(fā)布時間:2025/4/16 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ActiveMQ入门系列二:入门代码实例(点对点模式) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

在上一篇《ActiveMQ入門系列一:認(rèn)識并安裝ActiveMQ(Windows下)》中,大致介紹了ActiveMQ和一些概念,并下載、安裝、啟動他,還訪問了他的控制臺頁面。

這篇,就用代碼實(shí)例說下如何實(shí)現(xiàn)消息的生產(chǎn)和消費(fèi)。

一、理論基礎(chǔ)

同RabbitMQ一樣,ActiveMQ中也是有兩種模式:

  • 點(diǎn)對點(diǎn)模式(Point to Point,簡寫為PTP)
  • 發(fā)布/訂閱模式(Publish & Subscribe,簡寫為Pub & Sub)

通過上一篇我們知道了制造消息的應(yīng)用叫生產(chǎn)者(Producer),生產(chǎn)者在生產(chǎn)了消息后會發(fā)送消息到目的地(Destination),到達(dá)消費(fèi)和處理消息的應(yīng)用(也就是消費(fèi)者Consumer)。這里的兩種模式就通過對應(yīng)不同的消息目的地(Destination)來實(shí)現(xiàn),PTP對應(yīng)Queue(隊(duì)列)、Pub&Sub對應(yīng)Topic(主題)。

今天就詳細(xì)介紹下PTP和Queue,下一篇介紹Pub & Sub和Topic。

在PTP模式的示意圖:

?

  • 消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消息消費(fèi)者從queue中取出并且消費(fèi)消息。
  • 消息被消費(fèi)以后,queue中不再有存儲,所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。
  • Queue支持存在多個消費(fèi)者,但是對一個消息而言,只會有一個消費(fèi)者可以消費(fèi)、其它的則不能消費(fèi)此消息了。
  • 當(dāng)消費(fèi)者不存在時,消息會一直保存,直到有消費(fèi)消費(fèi)。

在PTP中,代碼實(shí)現(xiàn)有兩種方式:消費(fèi)者主動消費(fèi)和消費(fèi)者監(jiān)聽消費(fèi),下面就分別說下。

二、消費(fèi)者主動消費(fèi)

主動消費(fèi)是最基本也是最簡單的消費(fèi)方式,先上代碼:

  • 創(chuàng)建maven工程并引入依賴 <dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-core</artifactId><version>5.7.0</version></dependency>
  • 實(shí)現(xiàn)生產(chǎn)者 package com.sam.ptp;
    import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /*** @author JAVA開發(fā)老菜鳥**/ public class Producer {public static final String QUEUE_NAME = "ptp-demo";//隊(duì)列名public void producer(String message) throws JMSException {ConnectionFactory factory = null;Connection connection = null;Session session = null;MessageProducer producer = null;try {/*** 1.創(chuàng)建連接工廠* 創(chuàng)建工廠,構(gòu)造方法有三個參數(shù):分別是用戶名、密碼、連接地址* 無參構(gòu)造:有默認(rèn)的連接地址,localhost* 一個參數(shù):無驗(yàn)證模式,無用戶的認(rèn)證* 三個參數(shù):有認(rèn)證和連接地址,我這里使用三個參數(shù)的構(gòu)造方法*/factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");/*** 2.創(chuàng)建連接,有兩個方法(我這里使用無參數(shù)的)* 無參數(shù)* 有參數(shù):用戶名、密碼;*/connection = factory.createConnection();/*** 3.啟動連接* 生產(chǎn)者可以不用調(diào)用start()方法啟動,因?yàn)樵诎l(fā)送消息的時候回進(jìn)行檢查* 如果未啟動連接,會自動啟動。* 如果有特殊配置,需要配置完成后再啟動連接*/connection.start();/*** 4.用連接創(chuàng)建會話* 有兩個參數(shù):是否需要事務(wù)、消息確認(rèn)機(jī)制* 如果支持事務(wù),對于生產(chǎn)者來說第二個參數(shù)就無效了,這個時候第二個參數(shù)建議傳入Session.SESSION_TRANSACTED* 如果不支持事務(wù),第二個參數(shù)有效且必須傳遞** AUTO_ACKNOWLEDGE:自動確認(rèn),消息處理后自動確認(rèn)(商業(yè)開發(fā)不推薦)* CLIENT_ACKNOWLEDGE:客戶端手動確認(rèn),消費(fèi)者處理后必須手動確認(rèn)* DUPS_OK_ACKNOWLEDGE:有副本的客戶端手動確認(rèn),消息可以多次處理(不建議)*/session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);/*** 5.用會話創(chuàng)建目的地(隊(duì)列)、生產(chǎn)者、消息* 隊(duì)列名是隊(duì)列的唯一標(biāo)記* 創(chuàng)建生產(chǎn)者的時候可以指定目的地,也可以在發(fā)送消息的時候再指定*/Destination destination = session.createQueue(QUEUE_NAME);producer = session.createProducer(destination);TextMessage textMessage = session.createTextMessage(message);/*** 6.生產(chǎn)者發(fā)送消息到目的地*/producer.send(textMessage);System.out.println("消息發(fā)送成功");} catch(Exception ex){throw ex;} finally {/*** 7.釋放資源*/if(producer != null){producer.close();}if(session != null){session.close();}if(connection != null){connection.close();}}}public static void main(String[] args){Producer producer = new Producer();try{producer.producer("hello, activemq");} catch (Exception ex){ex.printStackTrace();}} }

    ?

  • 實(shí)現(xiàn)消費(fèi)者 package com.sam.ptp;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** @author JAVA開發(fā)老菜鳥** 主動消費(fèi)*/ public class Consumer {public String consumer() throws JMSException {ConnectionFactory factory = null;Connection connection = null;Session session = null;MessageConsumer consumer = null;try {factory = new ActiveMQConnectionFactory("admin","admin","tcp://localhost:61616");connection = factory.createConnection();/*** 消費(fèi)者必須啟動連接,否則無法消費(fèi)*/connection.start();session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(Producer.QUEUE_NAME);consumer = session.createConsumer(destination);/*** 獲取隊(duì)列消息*/Message message = consumer.receive();String text = ((TextMessage) message).getText();return text;} catch(Exception ex){throw ex;} finally {/*** 7.釋放資源*/if(consumer != null){consumer.close();}if(session != null){session.close();}if(connection != null){connection.close();}}}public static void main(String[] args){Consumer consumer = new Consumer();try{String message = consumer.consumer();System.out.println("消息消費(fèi)成功:" + message);} catch (Exception ex){ex.printStackTrace();}} }

    ?

  • 好,這樣代碼就寫好了,我們來測試下。

    1.先運(yùn)行生產(chǎn)者,我發(fā)現(xiàn)報(bào)錯了。。。

    好吧,原來是我這次沒有啟動ActiveMQ,被自己蠢哭了。。。

    啟動ActiveMQ之后,再運(yùn)行生產(chǎn)者,成功了。

    去看下控制臺頁面的變化,隊(duì)列里面多了個“ptp-demo”隊(duì)列,這個就是我們生產(chǎn)者代碼里面的隊(duì)列名,并且能看到該隊(duì)列的基本情況:

    從左到右依次為,有待消費(fèi)的消息1條、消費(fèi)者0個、已經(jīng)發(fā)送的消息1條、已經(jīng)消費(fèi)的消息0條

    2.接下來運(yùn)行消費(fèi)者,成功

    再去看下控制臺頁面,發(fā)現(xiàn)隊(duì)列信息變了,從左到右依次為:有待消費(fèi)的消息0條、消費(fèi)者0個、已經(jīng)發(fā)送的消息1條、已經(jīng)消費(fèi)的消息1條

    也就是說,消息真的被消費(fèi)了!

    ?代碼寫完了,也按照預(yù)期執(zhí)行完了,我們現(xiàn)在再回過頭來分析下消費(fèi)者的代碼,會發(fā)現(xiàn)他在consumer.receive()之后不會再消費(fèi)其他消息了,即便后面再有消息被生產(chǎn)出來也不會再消費(fèi)。也就是說只能在運(yùn)行后消費(fèi)一次消息,這個就是主動消費(fèi)。

    如果想要循環(huán)消費(fèi)多次產(chǎn)生的消息的話,怎么辦呢?請用下面的監(jiān)聽消費(fèi)

    ?

    三、消費(fèi)者監(jiān)聽消費(fèi)

    還是先上代碼,代碼結(jié)構(gòu)同主動消費(fèi)類似,有細(xì)微差別,具體代碼不貼了,可以到我的GitHub或碼云上獲取源碼

  • 首先為了區(qū)分,我把隊(duì)列名改了 public static final String QUEUE_NAME = "ptp-listener-demo";//隊(duì)列名

    ?

  • 生產(chǎn)者和消費(fèi)者的消息確認(rèn)方式都改成了客戶端手動確認(rèn),不再自動確認(rèn),手動確認(rèn)有個好處就是可以防止消息沒有被正常消費(fèi)而丟失,這個同RabbitMQ機(jī)制一樣 session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

    ?

  • 生產(chǎn)者生產(chǎn)消息的時候,為了方便我改成了一次性發(fā)送10條 /*** 6.創(chuàng)建消息并且生產(chǎn)者發(fā)送消息到目的地*/for(int num = 0; num < 10; num++){TextMessage textMessage = session.createTextMessage(message + num);producer.send(textMessage);System.out.println("消息發(fā)送成功"+textMessage.getText());}

    ?

  • 關(guān)鍵點(diǎn)來了,在消費(fèi)者上加了一個監(jiān)聽器 /*** 注冊監(jiān)聽器,隊(duì)列中的消息變化會自動觸發(fā)監(jiān)聽器,接收并自動處理消息** 監(jiān)聽器一旦注冊,永久有效,一直到程序關(guān)閉* 監(jiān)聽器可以注冊多個,相當(dāng)于集群* activemq自動輪詢多個監(jiān)聽器,實(shí)現(xiàn)并行處理*/consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {try {//需要手動確認(rèn)消息 message.acknowledge();TextMessage om = (TextMessage) message;String data = om.getText();System.out.println(data);} catch (JMSException e) {e.printStackTrace();}}});

    ?

  • 執(zhí)行生產(chǎn)者:

    ?

    執(zhí)行消費(fèi)者,消息全部被消費(fèi)了:

    ?

    再執(zhí)行2遍生產(chǎn)者,消息同樣都被消費(fèi)了。?

    控制臺頁面多了個隊(duì)列,由于監(jiān)聽中的消費(fèi)者沒有關(guān)閉,因此這里能看到消費(fèi)者數(shù)量為1,我執(zhí)行了三遍生產(chǎn)者,因此消息有30條。

    還沒完,繼續(xù)...

    我們這次先啟動2個消費(fèi)者,然后啟動生產(chǎn)者

    兩個生產(chǎn)者分別消費(fèi)了消息0,2,4,6,8和1,3,5,7,9

    也就是說兩個消費(fèi)者都監(jiān)聽到了消息,并且activemq自動輪詢兩個監(jiān)聽器發(fā)送消息。

    ?

    好,到這里,ActiveMQ的點(diǎn)對點(diǎn)模式就介紹完了。下一篇介紹發(fā)布訂閱模式,敬請期待

    轉(zhuǎn)載于:https://www.cnblogs.com/sam-uncle/p/10988930.html

    總結(jié)

    以上是生活随笔為你收集整理的ActiveMQ入门系列二:入门代码实例(点对点模式)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 按摩ⅹxxx性hd中国 | 天天色天天射综合网 | 青青草视频免费看 | 日韩免费一二三区 | 美国成人免费视频 | 免费精品视频在线观看 | av中文在线资源 | 无码aⅴ精品一区二区三区浪潮 | 无毛av| 日韩高清一二三区 | 草草影院1 | 女仆裸体打屁屁羞羞免费 | 无码人妻精品一区二区三区99v | 天天久久综合网 | 男生女生羞羞网站 | 精品久久久久久久久中文字幕 | 国产av一区二区三区传媒 | 亚洲一级av无码毛片精品 | 成人激情视频在线播放 | sesese99| 午夜宫| 91禁看片| 中文字幕1 | 我会温柔一点的日剧 | 亚洲AV无码国产精品播放在线 | 91国语对白| 人妻在客厅被c的呻吟 | 中文字幕一区二区视频 | 99久久国产宗和精品1上映 | 欧美在线v | 大香伊人中文字幕精品 | 伊人色播| 久久精品国产一区二区三区 | 黄色片子免费 | 亚洲国产无线乱码在线观看 | 亚洲日本成人在线观看 | 欧美色亚洲色 | 免费一区二区三区视频在线 | 91成人免费电影 | 欧美不卡一区二区 | 韩国三级av | 不卡一区二区在线观看 | 成人自拍视频 | 91原视频| 亚洲女人久久久 | 伊人久久大香线蕉成人综合网 | 亚洲一区不卡 | 91免费看黄 | 国产精品.www | 国产精品久久久久久久久借妻 | 久久影音先锋 | 国产黄色在线免费观看 | 孕妇爱爱视频 | 亚洲色图88 | 久久中字 | 久久国产黄色片 | 色视频免费在线观看 | 国产精品99精品无码视亚 | 国产欧美日韩精品区一区二污污污 | 青草精品视频 | 一级片在线免费观看 | 蜜桃视频在线网站 | 欧美色射 | 日韩精品免费 | 国产人妖一区二区三区 | 国产欧美日韩在线 | 亚洲一级二级片 | 人人爱超碰 | 摸大乳喷奶水www视频 | youjizz自拍 | 亚洲av无码一区二区三区在线播放 | 91蜜桃臀久久一区二区 | 国产视频久久久久久久 | 最新视频在线观看 | 国产在线视频你懂得 | 印度午夜性春猛xxx交 | www.久久久 | 天堂在线官网 | 青青草亚洲 | 女优一区| 99精品99| 麻豆视频在线免费观看 | 可以看黄色的网站 | 果冻传媒av | av男人的天堂在线 | 欧美一区二区高清视频 | brazzers欧美极品少妇 | 久久精品一区二区三区四区 | 欧av在线 | 打屁股无遮挡网站 | 欧美日韩一卡二卡 | a v在线视频 | 精品少妇白浆小泬60P | 少妇高潮一区二区三区99小说 | 影音先锋中文字幕一区 | 美女av在线免费观看 | 亚洲福利在线观看 | 香蕉视频黄在线观看 | 日本打白嫩屁股视频 |