顺序消息发送者
package com.leon.mq.rocketmq.order;import java.util.ArrayList;
import java.util.List;/*** 訂單構(gòu)建者*/
public class OrderStep {private long orderId;private String desc;public long getOrderId() {return orderId;}public void setOrderId(long orderId) {this.orderId = orderId;}public String getDesc() {return desc;}public void setDesc(String desc) {this.desc = desc;}@Overridepublic String toString() {return "OrderStep{" +"orderId=" + orderId +", desc='" + desc + '\'' +'}';}public static List<OrderStep> buildOrders() {// 1039L : 創(chuàng)建 付款 推送 完成// 1065L : 創(chuàng)建 付款// 7235L :創(chuàng)建 付款List<OrderStep> orderList = new ArrayList<OrderStep>();OrderStep orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("創(chuàng)建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("創(chuàng)建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("創(chuàng)建");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("付款");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1065L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("推送");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(7235L);orderDemo.setDesc("完成");orderList.add(orderDemo);orderDemo = new OrderStep();orderDemo.setOrderId(1039L);orderDemo.setDesc("完成");orderList.add(orderDemo);return orderList;}
}
package com.leon.mq.rocketmq.order;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;public class Producer {public static void main(String[] args) throws Exception {//1.創(chuàng)建消息生產(chǎn)者producer,并制定生產(chǎn)者組名DefaultMQProducer producer = new DefaultMQProducer("group1");//2.指定Nameserver地址producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876");//3.啟動producerproducer.start();//構(gòu)建消息集合List<OrderStep> orderSteps = OrderStep.buildOrders();//發(fā)送消息for (int i = 0; i < orderSteps.size(); i++) {String body = orderSteps.get(i) + "";Message message = new Message("OrderTopic", "Order", "i" + i, body.getBytes());/*** 參數(shù)一:消息對象* 參數(shù)二:消息隊(duì)列的選擇器* 參數(shù)三:選擇隊(duì)列的業(yè)務(wù)標(biāo)識(訂單ID)*/SendResult sendResult = producer.send(message, new MessageQueueSelector() {/**** @param mqs:隊(duì)列集合* @param msg:消息對象* @param arg:業(yè)務(wù)標(biāo)識的參數(shù)* @return*/@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {long orderId = (long) arg;long index = orderId % mqs.size();return mqs.get((int) index);}}, orderSteps.get(i).getOrderId());System.out.println("發(fā)送結(jié)果:" + sendResult);}producer.shutdown();}}
?
總結(jié)