分布式消息中间件-Rocketmq
生活随笔
收集整理的這篇文章主要介紹了
分布式消息中间件-Rocketmq
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
簡(jiǎn)述
現(xiàn)在生產(chǎn)中用的最多的消息隊(duì)列有Activemq,rabbitmq,kafka,rocketmq等。JMS規(guī)范
rocketmq雖然不完全基于jms規(guī)范,但是他參考了jms規(guī)范和 CORBA Notification 規(guī)范等, 可以說(shuō)是青出于藍(lán)而勝于藍(lán),什么是jms呢
jms其實(shí)就是類(lèi)似于jdbc的一套接口規(guī)范,但不同的是他是面向的消息服務(wù),提供一套標(biāo)準(zhǔn)API接口, 大部分廠商都會(huì)參考jms規(guī)范,不過(guò)我們后面要講到的rocketmq卻沒(méi)有嚴(yán)格遵守jms規(guī)范,后面我們會(huì)講到。一些常見(jiàn)的jms廠商有:IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ, 還有APACHE開(kāi)源的ActiveMQ。這里面Activemq這個(gè)也是我接觸到的第一個(gè)mq,現(xiàn)在市場(chǎng)份額也是很大的, 京東商城采用的就是這個(gè)。基本概念
發(fā)送者( Sender)也就是消息的生產(chǎn)者,俗的將就是創(chuàng)建并發(fā)送消息的JMS客戶(hù)端。接收者( Receiver)也就是消息消費(fèi)者,接收訂制消息的并按照相應(yīng)的業(yè)務(wù)邏輯進(jìn)行處理,最終將結(jié)果反饋給mq的服務(wù)端。點(diǎn)對(duì)點(diǎn)( Point-to-Point(P2P) )點(diǎn)對(duì)點(diǎn)就是一對(duì)一的關(guān)系,一個(gè)消息發(fā)出只有一個(gè)接受者所處理。每個(gè)消息都被發(fā)送到一個(gè)特定的隊(duì)列,接收者從隊(duì)列中獲取消息。隊(duì)列保留著消息,直到他們被消費(fèi)或超時(shí)。發(fā)布訂閱( Publish/Subscribe(Pub/Sub) )1、客戶(hù)端將消息發(fā)送到主題。多個(gè)發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個(gè)訂閱者。消息隊(duì)列(Queue) 一個(gè)容納那些被發(fā)送的等待閱讀的消息的區(qū)域。與隊(duì)列名字所暗示的意思不同, 消息的接受順序并不一定要與消息的發(fā)送順序相同。一旦一個(gè)消息被閱讀,該消息將被從隊(duì)列中移走。主題(Topic)一種支持發(fā)送消息給多個(gè)訂閱者的機(jī)制。發(fā)布者(Publisher) 同生產(chǎn)者訂閱者(Subscriber) 針對(duì)同一主題的多個(gè)消費(fèi)者對(duì)象模型
(1) ConnectionFactory創(chuàng)建Connection對(duì)象的工廠,針對(duì)兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種(很顯然是基于點(diǎn)對(duì)點(diǎn)和和發(fā)布訂閱的兩種方式分別創(chuàng)建連接工廠的)。 可以通過(guò)JNDI來(lái)查找ConnectionFactory對(duì)象。(2) DestinationDestination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說(shuō)消息消費(fèi)者的消息來(lái)源。對(duì)于消息生產(chǎn)者來(lái)說(shuō), 它的Destination是某個(gè)隊(duì)列(Queue)或某個(gè)主題(Topic);對(duì)于消息消費(fèi)者來(lái)說(shuō), 它的Destination也是某個(gè)隊(duì)列或主題(即消息來(lái)源)。所以, Destination實(shí)際上就是兩種類(lèi)型的對(duì)象:Queue、Topic可以通過(guò)JNDI來(lái)查找Destination。(3) ConnectionConnection表示在客戶(hù)端和JMS系統(tǒng)之間建立的鏈接(對(duì)TCP/IP socket的包裝)。 Connection可以產(chǎn)生一個(gè)或多個(gè)Session。跟ConnectionFactory一樣, Connection也有兩種類(lèi)型:QueueConnection和TopicConnection。(4) SessionSession是我們操作消息的接口。可以通過(guò)session創(chuàng)建生產(chǎn)者、消費(fèi)者、消息等。 Session提供了事務(wù)的功能。當(dāng)我們需要使用session發(fā)送/接收多個(gè)消息時(shí), 可以將這些發(fā)送/接收動(dòng)作放到一個(gè)事務(wù)中。同樣,也分QueueSession和TopicSession。(5) 消息的生產(chǎn)者消息生產(chǎn)者由Session創(chuàng)建,并用于將消息發(fā)送到Destination。 同樣,消息生產(chǎn)者分兩種類(lèi)型:QueueSender和TopicPublisher。 可以調(diào)用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息。(6) 消息消費(fèi)者 消息消費(fèi)者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息。 兩種類(lèi)型:QueueReceiver和TopicSubscriber。 可分別通過(guò)session的createReceiver(Queue)或createSubscriber(Topic)來(lái)創(chuàng)建。 當(dāng)然,也可以session的creatDurableSubscriber方法來(lái)創(chuàng)建持久化的訂閱者。(7) MessageListener消息監(jiān)聽(tīng)器。如果注冊(cè)了消息監(jiān)聽(tīng)器,一旦消息到達(dá),將自動(dòng)調(diào)用監(jiān)聽(tīng)器的onMessage方法。 我們后面消息消費(fèi)還會(huì)看到。消息消費(fèi)
在JMS中,消息的產(chǎn)生和消息是異步的。對(duì)于消費(fèi)來(lái)說(shuō),JMS的消息者可以通過(guò)兩種方式來(lái)消費(fèi)消息。○ 同步 訂閱者或接收者調(diào)用receive方法來(lái)接收消息,receive方法在能夠接收到消息之前(或超時(shí)之前)將一直阻塞○ 異步 訂閱者或接收者可以注冊(cè)為一個(gè)消息監(jiān)聽(tīng)器。當(dāng)消息到達(dá)之后,系統(tǒng)自動(dòng)調(diào)用監(jiān)聽(tīng)器的onMessage方法。activemq的部分代碼來(lái)簡(jiǎn)單說(shuō)明一下上面說(shuō)道的一些JMS規(guī)范
public void init(){try {//創(chuàng)建一個(gè)鏈接工廠(用戶(hù)名,密碼,broker的url地址)connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);//從工廠中創(chuàng)建一個(gè)鏈接connection = connectionFactory.createConnection();//開(kāi)啟鏈接connection.start();//創(chuàng)建一個(gè)會(huì)話(huà)session = connection.createSession(true,Session.SESSION_TRANSACTED);} catch (JMSException e) {e.printStackTrace();}}公共部分:也就是說(shuō)不管你是消息的生產(chǎn)者還是消息的消費(fèi)者都需要這些步驟 1.首先我們需要?jiǎng)?chuàng)建一個(gè)連接工廠,當(dāng)然這里我們需要輸入用戶(hù)性和密碼還有就是broker的url 2.然后我們根據(jù)連接工廠創(chuàng)建了一個(gè)連接,此刻這個(gè)工廠并沒(méi)有和broker建立連接 3.調(diào)用start方法就和broker建立了連接,這里我大概解釋一下broker 4.創(chuàng)建一個(gè)session,上面我們提到過(guò)所有的消息操作都是與session進(jìn)行的public void sendMsg(String queueName){try {//創(chuàng)建一個(gè)消息隊(duì)列(此處也就是在創(chuàng)建Destination)Queue queue = session.createQueue(queueName);//消息生產(chǎn)者M(jìn)essageProducer messageProducer = null;if(threadLocal.get()!=null){messageProducer = threadLocal.get();}else{messageProducer = session.createProducer(queue);threadLocal.set(messageProducer);}while(true){Thread.sleep(1000);int num = count.getAndIncrement();//創(chuàng)建一條消息TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:生產(chǎn)消息,count:"+num);//發(fā)送消息messageProducer.send(msg);//提交事務(wù)session.commit();}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}生產(chǎn):配置完上面的公共部分我們就迫不及待的把消息生產(chǎn)出來(lái)吧,我這邊說(shuō)的是點(diǎn)對(duì)點(diǎn)的方式
1.通過(guò)session創(chuàng)建一個(gè)Destination,我這邊直接就用了queue了 2.接下來(lái)我們需要?jiǎng)?chuàng)建一個(gè)消息的生產(chǎn)者 3.我這邊就循環(huán)每1s發(fā)送一條消息 4.這邊看到我們的消息也是用session來(lái)創(chuàng)建的,這里面我們用的是文本的消息類(lèi)型 5.發(fā)送消息 6.提交這次發(fā)送,至此我們的消息就發(fā)送到了broker上了,用過(guò)activemq的同學(xué)都知道, activemq提供了一個(gè)很好用的界面可以查到你的消息的狀態(tài),包括是否消費(fèi)等消費(fèi):消費(fèi)我們上面也提到了兩種方式,同步和異步,我這邊準(zhǔn)備了兩份代碼分別說(shuō)明了一下public void doMessage(String queueName){try {//創(chuàng)建DestinationQueue queue = session.createQueue(queueName);MessageConsumer consumer = null;while(true){Thread.sleep(1000);TextMessage msg = (TextMessage) consumer.receive();if(msg!=null) {msg.acknowledge();System.out.println(Thread.currentThread().getName()+": Consumer:我是消費(fèi)者,我正在消費(fèi)Msg"+msg.getText()+"--->"+count.getAndIncrement());}else {break;}}} catch (JMSException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}同步:可以看到消息會(huì)一直阻塞到有消息才會(huì)繼續(xù)
?
?
總結(jié)
以上是生活随笔為你收集整理的分布式消息中间件-Rocketmq的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: spring WebSocket详解
- 下一篇: 20190212