消费者广播模式和负载均衡模式
生活随笔
收集整理的這篇文章主要介紹了
消费者广播模式和负载均衡模式
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
消費消息
1)負載均衡模式
消費者采用負載均衡方式消費消息,多個消費者共同消費隊列消息,每個消費者處理的消息不同 ?
2)廣播模式
消費者采用廣播的方式消費消息,每個消費者消費的消息都是相同的
package com.leon.mq.rocketmq.base.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** 消息的接受者*/ public class Consumer {public static void main(String[] args) throws Exception {//1.創建消費者Consumer,制定消費者組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.訂閱主題Topic和Tagconsumer.subscribe("base", "*");//設定消費模式:負載均衡|廣播模式consumer.setMessageModel(MessageModel.BROADCASTING);//4.設置回調函數,處理消息consumer.registerMessageListener(new MessageListenerConcurrently() {//接受消息內容@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//5.啟動消費者consumerconsumer.start();} }?
總結
以上是生活随笔為你收集整理的消费者广播模式和负载均衡模式的全部內容,希望文章能夠幫你解決所遇到的問題。