消息发送和接收基本应用
添加jar包依賴?
<dependency>?<groupId>org.apache.rocketmq</groupId>?<artifactId>rocketmq-client</artifactId>?<version>4.5.2</version>? </dependency>?生產(chǎn)者?
public class RocketMqProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//事務(wù)消息的時(shí)候會(huì)用到DefaultMQProducer producer=new DefaultMQProducer("gp_producer_group");producer.setNamesrvAddr("192.168.13.102:9876"); //它會(huì)從命名服務(wù)器上拿到broker的地址producer.start();int num=0;while(num<20){num++;//Topic//tags -> 標(biāo)簽 (分類) -> (篩選)Message message=new Message("gp_test_topic","TagA",("Hello , RocketMQ:"+num).getBytes());//消息路由策略producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {return list.get(0);}},"key-"+num);}} }SendResult中,有一個(gè)sendStatus狀態(tài),表示消息的發(fā)送狀態(tài)。一共有四種狀態(tài)
1. FLUSH_DISK_TIMEOUT : 表示沒有在規(guī)定時(shí)間內(nèi)完成刷盤(需要Broker 的刷盤策Ill創(chuàng)立設(shè)置成SYNC_FLUSH 才會(huì)報(bào)這個(gè)錯(cuò)誤) 。
2. FLUSH_SLAVE_TIMEOUT :表示在主備方式下,并且Broker 被設(shè)置成SYNC_MASTER 方式,沒有在設(shè)定時(shí)間內(nèi)完成主從同步。
3. SLAVE_NOT_AVAILABLE : 這個(gè)狀態(tài)產(chǎn)生的場(chǎng)景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方式下,并且Broker 被設(shè)置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。
4. SEND OK :表示發(fā)送成功,發(fā)送成功的具體含義,比如消息是否已經(jīng)被存儲(chǔ)到磁盤?消息是否被同步到了Slave 上?消息在Slave 上是否被寫入磁盤?需要結(jié)合所配置的刷盤策略、主從策略來定。這個(gè)狀態(tài)還可以簡(jiǎn)單理解為,沒有發(fā)生上面列出的三個(gè)問題狀態(tài)就是SEND OK?
消費(fèi)者?
consumerGroup:位于同一個(gè)consumerGroup中的consumer實(shí)例和producerGroup中的各個(gè)produer實(shí)例承擔(dān)的角色類似;同一個(gè)group中可以配置多個(gè)consumer,可以提高消費(fèi)端的并發(fā)消費(fèi)能力以及容災(zāi)
和kafka一樣,多個(gè)consumer會(huì)對(duì)消息做負(fù)載均衡,意味著同一個(gè)topic下的不同messageQueue會(huì)分發(fā)給同一個(gè)group中的不同consumer。
同時(shí),如果我們希望消息能夠達(dá)到廣播的目的,那么只需要把consumer加入到不同的group就行。
?
RocketMQ提供了兩種消息消費(fèi)模型,一種是pull主動(dòng)拉去,另一種是push,被動(dòng)接收。但實(shí)際上RocketMQ都是pull模式,只是push在pull模式上做了一層封裝,也就是pull到消息以后觸發(fā)業(yè)務(wù)消費(fèi)者注冊(cè)到這里的callback. RocketMQ是基于長(zhǎng)輪訓(xùn)來實(shí)現(xiàn)消息的pull?
nameServer的地址:name server地址,用于獲取broker、topic信息
public class RocketMqConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("gp_consumer_group");consumer.setNamesrvAddr("192.168.13.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("gp_test_topic","*");/*consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("Receive Message: "+list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //簽收}});*/consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {MessageExt messageExt=list.get(0);//TODO --// Throw Exceptio// 重新發(fā)送該消息// DLQ(通用設(shè)計(jì))if(messageExt.getReconsumeTimes()==3){ //消息重發(fā)了三次//持久化 消息記錄表return ConsumeOrderlyStatus.SUCCESS; //簽收}return ConsumeOrderlyStatus.SUCCESS; //簽收}});consumer.start();} }?
?
總結(jié)
以上是生活随笔為你收集整理的消息发送和接收基本应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ安装内存不足的问题
- 下一篇: rocketmq控制台安装