日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

消息发送和接收基本应用

發(fā)布時(shí)間:2024/4/13 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 消息发送和接收基本应用 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

添加jar包依賴?

<dependency>?<groupId>org.apache.rocketmq</groupId>?<artifactId>rocketmq-client</artifactId>?<version>4.5.2</version>? </dependency>?

生產(chǎn)者?

public class RocketMqProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//事務(wù)消息的時(shí)候會(huì)用到DefaultMQProducer producer=new DefaultMQProducer("gp_producer_group");producer.setNamesrvAddr("192.168.13.102:9876"); //它會(huì)從命名服務(wù)器上拿到broker的地址producer.start();int num=0;while(num<20){num++;//Topic//tags -> 標(biāo)簽 (分類) -> (篩選)Message message=new Message("gp_test_topic","TagA",("Hello , RocketMQ:"+num).getBytes());//消息路由策略producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {return list.get(0);}},"key-"+num);}} }

SendResult中,有一個(gè)sendStatus狀態(tài),表示消息的發(fā)送狀態(tài)。一共有四種狀態(tài)

1. FLUSH_DISK_TIMEOUT : 表示沒有在規(guī)定時(shí)間內(nèi)完成刷盤(需要Broker 的刷盤策Ill創(chuàng)立設(shè)置成SYNC_FLUSH 才會(huì)報(bào)這個(gè)錯(cuò)誤) 。

2. FLUSH_SLAVE_TIMEOUT :表示在主備方式下,并且Broker 被設(shè)置成SYNC_MASTER 方式,沒有在設(shè)定時(shí)間內(nèi)完成主從同步。

3. SLAVE_NOT_AVAILABLE : 這個(gè)狀態(tài)產(chǎn)生的場(chǎng)景和FLUSH_SLAVE_TIMEOUT 類似, 表示在主備方式下,并且Broker 被設(shè)置成SYNC_MASTER ,但是沒有找到被配置成Slave 的Broker 。

4. SEND OK :表示發(fā)送成功,發(fā)送成功的具體含義,比如消息是否已經(jīng)被存儲(chǔ)到磁盤?消息是否被同步到了Slave 上?消息在Slave 上是否被寫入磁盤?需要結(jié)合所配置的刷盤策略、主從策略來定。這個(gè)狀態(tài)還可以簡(jiǎn)單理解為,沒有發(fā)生上面列出的三個(gè)問題狀態(tài)就是SEND OK?

消費(fèi)者?

consumerGroup:位于同一個(gè)consumerGroup中的consumer實(shí)例和producerGroup中的各個(gè)produer實(shí)例承擔(dān)的角色類似;同一個(gè)group中可以配置多個(gè)consumer,可以提高消費(fèi)端的并發(fā)消費(fèi)能力以及容災(zāi)

和kafka一樣,多個(gè)consumer會(huì)對(duì)消息做負(fù)載均衡,意味著同一個(gè)topic下的不同messageQueue會(huì)分發(fā)給同一個(gè)group中的不同consumer。

同時(shí),如果我們希望消息能夠達(dá)到廣播的目的,那么只需要把consumer加入到不同的group就行。
?
RocketMQ提供了兩種消息消費(fèi)模型,一種是pull主動(dòng)拉去,另一種是push,被動(dòng)接收。但實(shí)際上RocketMQ都是pull模式,只是push在pull模式上做了一層封裝,也就是pull到消息以后觸發(fā)業(yè)務(wù)消費(fèi)者注冊(cè)到這里的callback. RocketMQ是基于長(zhǎng)輪訓(xùn)來實(shí)現(xiàn)消息的pull?

nameServer的地址:name server地址,用于獲取broker、topic信息

public class RocketMqConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("gp_consumer_group");consumer.setNamesrvAddr("192.168.13.102:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("gp_test_topic","*");/*consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,ConsumeConcurrentlyContext consumeConcurrentlyContext) {System.out.println("Receive Message: "+list);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //簽收}});*/consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {MessageExt messageExt=list.get(0);//TODO --// Throw Exceptio// 重新發(fā)送該消息// DLQ(通用設(shè)計(jì))if(messageExt.getReconsumeTimes()==3){ //消息重發(fā)了三次//持久化 消息記錄表return ConsumeOrderlyStatus.SUCCESS; //簽收}return ConsumeOrderlyStatus.SUCCESS; //簽收}});consumer.start();} }

?

?

總結(jié)

以上是生活随笔為你收集整理的消息发送和接收基本应用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。