第五章 限时订单实战笔记
什么是限時(shí)訂單?在各種電商網(wǎng)站下訂單后會(huì)保留一個(gè)時(shí)間段,時(shí)間段內(nèi)未支付則自動(dòng)將訂單狀態(tài)設(shè)置為已過(guò)期,這種訂單稱之為限時(shí)訂單。
代碼地址:https://gitee.com/hankin_chj/rocketmq-platform.git?(rocket-delay-order)
?
一、如何實(shí)現(xiàn)限時(shí)訂單
1、限時(shí)訂單的流程
電商平臺(tái)都會(huì)包含以下 5 種狀態(tài)。
待付款:代表買(mǎi)家下單了但是還沒(méi)有付款。
待發(fā)貨:代表買(mǎi)家付款了賣(mài)家還沒(méi)有發(fā)貨。
已發(fā)貨:代表賣(mài)家已經(jīng)發(fā)貨并寄出商品了。
已完成:代表買(mǎi)家已經(jīng)確認(rèn)收到貨了。
已關(guān)閉:代表訂單過(guò)期了買(mǎi)家也沒(méi)付款、或者賣(mài)家關(guān)閉了訂單。?
2、限時(shí)訂單實(shí)現(xiàn)的關(guān)鍵
我們可以看到,訂單中的很多狀態(tài)都是可以用戶觸發(fā)的,唯獨(dú)訂單過(guò)期了買(mǎi)家也沒(méi)付款我們需要自動(dòng)的把訂單給關(guān)閉,這個(gè)操作是沒(méi)有用戶或者是人工干預(yù)的,所以限時(shí)訂單的關(guān)鍵就是如何檢查訂單狀態(tài),如果訂單過(guò)期了則把該訂單設(shè)置為關(guān)閉狀態(tài)。
3、輪詢數(shù)據(jù)庫(kù)?
輪詢數(shù)據(jù)庫(kù)在實(shí)現(xiàn)限時(shí)訂單上是可行的,而且實(shí)現(xiàn)起來(lái)很簡(jiǎn)單,寫(xiě)個(gè)定時(shí)器去每隔一段時(shí)間掃描數(shù)據(jù)庫(kù),檢查到訂單過(guò)期了,做適當(dāng)?shù)臉I(yè)務(wù)處理。
但是輪詢會(huì)帶來(lái)什么問(wèn)題?
1)輪詢大部分時(shí)間其實(shí)是在做無(wú)用功,我們假設(shè)一張訂單是45分鐘過(guò)期,每1分鐘我們掃描一次,對(duì)這張訂單來(lái)說(shuō),要掃描45次以后,才會(huì)檢查到這張訂單過(guò)期,這就意味著數(shù)據(jù)庫(kù)的資源(連接,IO)被白白浪費(fèi)了;
2)處理上的不及時(shí),一個(gè)待支付的電影票訂單我們假設(shè)是12:00:35過(guò)期,但是上次掃描的時(shí)間是 12:00:30,那么這個(gè)訂單實(shí)際的過(guò)期時(shí)間是12:01:30,和我本來(lái)的過(guò)期時(shí)間差了55秒鐘。放在業(yè)務(wù)上,會(huì)帶來(lái)什么問(wèn)題?這張電影票,假設(shè)是最后一張,有個(gè)人12:00:55來(lái)買(mǎi)票,買(mǎi)得到嗎?當(dāng)然買(mǎi)不到了。那么這張電影票很有可能就浪費(fèi)了。如果縮短掃描的時(shí)間間隔,第一只能改善不能解決,第二,又會(huì)對(duì)數(shù)據(jù)庫(kù)造成更大的壓力。 那么我們能否有種機(jī)制,不用定時(shí)掃描,當(dāng)訂單到期了,自然通知我們的應(yīng)用去處理這些到期的訂單呢?
4、Java本身的提供的解決方案
java其實(shí)已經(jīng)為我們提供了解決問(wèn)題的方法。我們想要處理限時(shí)支付的問(wèn)題,肯定是要有個(gè)地方保存這些限時(shí)訂單的信息的,意味著我們需要一個(gè)容器,于是我們?cè)贘ava容器中去尋找Map? List? Queue?
看看java為我們提供的容器,我們是個(gè)多線程下的應(yīng)用,會(huì)有多個(gè)用戶同時(shí)下訂單,所以所有并發(fā)不安全的容器首先被排除,并發(fā)安全的容器有哪些?java在阻塞隊(duì)列里為我們提供了一種叫延遲隊(duì)列delayQueue的容器,剛好可以為我們解決問(wèn)題。
DelayQueue:阻塞隊(duì)列(先進(jìn)先出)
1)支持阻塞的插入方法:意思是當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿。
2)支持阻塞的移除方法:意思是在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡Q舆t期滿時(shí)才能從中提取元素(光隊(duì)列里有元素還不行)。
Delayed接口使對(duì)象成為延遲對(duì)象,它使存放在DelayQueue類中的對(duì)象具有了激活日期,該接口強(qiáng)制實(shí)現(xiàn)下列兩個(gè)方法:
? CompareTo(Delayed o):Delayed接口繼承了Comparable接口,因此有了這個(gè)方法,讓元素按激活日期排隊(duì)。
? getDelay(TimeUnit unit):這個(gè)方法返回到激活日期的剩余時(shí)間,時(shí)間單位由單位參數(shù)指定。 阻塞隊(duì)列更多詳情,參考《并發(fā)編程》。
5、架構(gòu)師應(yīng)該多考慮一點(diǎn)
架構(gòu)師在設(shè)計(jì)和實(shí)現(xiàn)系統(tǒng)時(shí)需要考慮些什么?
功能:這個(gè)沒(méi)什么好說(shuō),實(shí)現(xiàn)一個(gè)應(yīng)用,連基本的功能都沒(méi)實(shí)現(xiàn),要這個(gè)應(yīng)用有何用?
高性能:能不能盡快的為用戶提供服務(wù)和能為多少用戶同時(shí)提供服務(wù),性能這個(gè)東西是個(gè)很綜合性的東西,從前端到后端,從架構(gòu)(緩存機(jī)制、異步機(jī)制)到 web 容器、數(shù)據(jù)庫(kù)本身再到虛擬機(jī)到算法、java 代碼、sql語(yǔ)句的編寫(xiě),全部都對(duì)性能有影響。如何提升性能,要建立在充分的性能測(cè)試的基礎(chǔ)上,然后一個(gè)個(gè)的去解決性能瓶頸。對(duì)上面提到的應(yīng)用來(lái)講,我們不想去輪詢數(shù)據(jù)庫(kù),其實(shí)跟性能有非常大的關(guān)系。
高可用:應(yīng)用正確處理業(yè)務(wù),服務(wù)用戶的時(shí)間,這個(gè)時(shí)間當(dāng)然是越長(zhǎng)越好,希望可以7*24小時(shí)。而且哪怕服務(wù)器出現(xiàn)了升級(jí),宕機(jī)等等情況下,能夠以最短的時(shí)間恢復(fù),為用戶繼續(xù)服務(wù),但是實(shí)際過(guò)程中沒(méi)有哪個(gè)網(wǎng)站可以說(shuō)做到100%,不管是Google、FaceBook、阿里、騰訊,一般來(lái)說(shuō)可以做到99.99%的可用性,已經(jīng)是相當(dāng)厲害了,這個(gè)水平大概就是一個(gè)服務(wù)在一年可以做到只有50分鐘不可用。這個(gè)需要技術(shù)、資金、技術(shù)人員的水平和責(zé)任心,還要運(yùn)氣。
高伸縮:伸縮性是指通過(guò)不斷向集群中加入服務(wù)器的手段來(lái)緩解不斷上升的用戶并發(fā)訪問(wèn)壓力和不斷增長(zhǎng)的數(shù)據(jù)存儲(chǔ)需求。就像彈簧一樣掛東西一樣,用戶多,伸一點(diǎn),用戶少,縮一點(diǎn)。衡量架構(gòu)是否高伸縮性的主要標(biāo)準(zhǔn)就是是否可用多臺(tái)服務(wù)器構(gòu)建集群,是否容易向集群中添加新的服務(wù)器。加入新的服務(wù)器后是否可以提供和原來(lái)服務(wù)器無(wú)差別的服務(wù)。集群中可容納的總的服務(wù)器數(shù)量是否有限制。
高擴(kuò)展:的主要標(biāo)準(zhǔn)就是在網(wǎng)站增加新的業(yè)務(wù)產(chǎn)品時(shí),是否可以實(shí)現(xiàn)對(duì)現(xiàn)有產(chǎn)品透明無(wú)影響,不需要任何改動(dòng)或者很少改動(dòng)既有業(yè)務(wù)功能就可以上線新產(chǎn)品。比如購(gòu)買(mǎi)電影票的應(yīng)用,用戶購(gòu)買(mǎi)電影票,現(xiàn)在我們要增加一個(gè)功能,用戶買(mǎi)了票后,隨機(jī)抽取用戶送限量周邊。怎么做到不改動(dòng)用戶下訂單功能的基礎(chǔ)上增加這個(gè)功能。熟悉設(shè)計(jì)模式的同學(xué),應(yīng)該很眼熟,這是設(shè)計(jì)模式中的開(kāi)閉原則(對(duì)擴(kuò)展開(kāi)放,對(duì)修改關(guān)閉)在架構(gòu)層面的一個(gè)原則。
6、從系統(tǒng)可用性角度考慮
應(yīng)用重啟帶來(lái)的問(wèn)題:
保存在Queue中的訂單會(huì)丟失,這些丟失的訂單會(huì)在什么時(shí)候過(guò)期,因?yàn)殛?duì)列里已經(jīng)沒(méi)有這個(gè)訂單了,無(wú)法檢查了,這些訂單就得不到處理了。
已過(guò)期的訂單不會(huì)被處理,在應(yīng)用的重啟階段,可能會(huì)有一部分訂單過(guò)期,這部分過(guò)期未支付的訂單同樣也得不到處理,會(huì)一直放在數(shù)據(jù)庫(kù)里,過(guò)期未支付訂單所對(duì)應(yīng)的資源比如電影票所對(duì)應(yīng)的座位,就不能被釋放出來(lái),讓別的用戶來(lái)購(gòu)買(mǎi)。
解決之道 :在系統(tǒng)啟動(dòng)時(shí)另行處理
7、從系統(tǒng)伸縮性角度考慮
集群化了會(huì)帶來(lái)什么問(wèn)題?應(yīng)用之間會(huì)相互搶奪訂單,特別是在應(yīng)用重啟的時(shí)候,重新啟動(dòng)的那個(gè)應(yīng)用會(huì)把不屬于自己的訂單,也全部加載到自己的隊(duì)列里去,一是造成內(nèi)存的浪費(fèi),二來(lái)會(huì)造成訂單的重復(fù)處理,而且加大了數(shù)據(jù)庫(kù)的壓力。
解決方案:讓?xiě)?yīng)用分區(qū)處理
1)給每臺(tái)服務(wù)器編號(hào),然后在訂單表里登記每條訂單的服務(wù)器編號(hào);
2)更簡(jiǎn)單的,在訂單表里登記每臺(tái)服務(wù)器的IP地址,修改相應(yīng)的sql語(yǔ)句即可。
幾個(gè)問(wèn)題:如果有一臺(tái)服務(wù)器掛了怎么辦?如果是某臺(tái)服務(wù)器下線或者宕機(jī),起不來(lái)怎么搞?這個(gè)還是還是稍微有點(diǎn)麻煩,需要人工干預(yù)一下,手動(dòng)把庫(kù)里的每條訂單數(shù)據(jù)的服務(wù)器編號(hào)改為目前正常的服務(wù)器的編號(hào),不過(guò)也就是一條sql語(yǔ)句的事,然后想辦法讓正常的服務(wù)器進(jìn)行處理(重啟正常的服務(wù)器)。
二、用RocketMQ實(shí)現(xiàn)限時(shí)訂單
引入RocketMQ使用延時(shí)消息,一舉解決我們限時(shí)訂單的伸縮性和擴(kuò)展性問(wèn)題。
1、延時(shí)消息
概念介紹
延時(shí)消息:Producer將消息發(fā)送到消息隊(duì)列RocketMQ服務(wù)端,但并不期望這條消息立馬投遞,而是延遲一定時(shí)間后才投遞到Consumer進(jìn)行消費(fèi),該消息即延時(shí)消息。
適用場(chǎng)景
消息生產(chǎn)和消費(fèi)有時(shí)間窗口要求:比如在電商交易中超時(shí)未支付關(guān)閉訂單的場(chǎng)景,在訂單創(chuàng)建時(shí)會(huì)發(fā)送一條延時(shí)消息。這條消息將會(huì)在30分鐘以后投遞給消費(fèi)者,消費(fèi)者收到此消息后需要判斷對(duì)應(yīng)的訂單是否已完成支付;如支付未完成,則關(guān)閉訂單,如已完成支付則忽略。
2、核心的代碼
整個(gè)代碼見(jiàn)delayOrder包,Git地址:https://gitee.com/hankin_chj/rocketmq-platform.git
2.1、配置部分
<!-- rocketMq生產(chǎn)者配置 -->
<bean id="rocketMQProducer" class="com.chj.service.mq.RocketMQProducer"
?????init-method="init" destroy-method="destroy">
???<property name="producerGroup" value="DelayOrderProducer" />
???<property name="namesrvAddr" value="127.0.0.1:9876" />
</bean>
<!-- 消費(fèi)者監(jiān)聽(tīng) -->
<bean id="messageListeners" class="com.chj.service.mq.MessageListenerImpl"></bean>
<!-- 消費(fèi)者配置 -->
<bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"
?????init-method="start" destroy-method="shutdown">
???<property name="consumerGroup" value="TimeOrderGroup" />
???<property name="namesrvAddr" value="127.0.0.1:9876" />
???<property name="messageModel" value="CLUSTERING" />
???<property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET" />
???<property name="messageListener" ref="messageListeners" />
???<property name="subscription">
??????<map>
?????????<entry key="TimeOrder" value="*" />
??????</map>
???</property>
</bean>
訂單處理的控制器代碼實(shí)現(xiàn):
@Controller public class OrderController {private static final String SUCCESS = "suc";private static final String FAILUER = "failure";@Autowiredprivate SaveOrder saveOrder;@RequestMapping("/index")public String userOrder(){return "order";}//保存訂單(界面生成幾個(gè)訂單)@RequestMapping("/submitOrder")@ResponseBodypublic String saveOrder(@RequestParam("orderNumber")int orderNumber){saveOrder.insertOrders(orderNumber);return SUCCESS;} }2.2、核心代碼實(shí)現(xiàn)
1)保存訂單SaveOrder.java的時(shí)候,作為生產(chǎn)者往消息隊(duì)列里推入訂單,核心RocketMQProducer,這個(gè)類當(dāng)然是要繼承IDelayOrder,同時(shí)也是RocketMQ的生產(chǎn)者。
訂單相關(guān)的服務(wù)SaveOrder.java代碼實(shí)現(xiàn):
@Service public class SaveOrder {private Logger logger = LoggerFactory.getLogger(SaveOrder.class);public final static short UNPAY = 0;public final static short PAYED = 1;public final static short EXPIRED = -1;@Autowiredprivate OrderExpDao orderExpDao;@Autowired@Qualifier("rocketmq")private IDelayOrder delayOrder;/*** 接收前端頁(yè)面參數(shù),生成訂單* @param orderNumber 訂單個(gè)數(shù)*/public void insertOrders(int orderNumber){Random r = new Random();OrderExp orderExp ;for(int i=0;i<orderNumber;i++) {//這個(gè)是設(shè)置延時(shí)消息的屬性//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ?18個(gè)等級(jí)long expire_duration =30;long expireTime =4;orderExp = new OrderExp();String orderNo = "DD00_30S";orderExp.setOrderNo(orderNo);orderExp.setOrderNote("享學(xué)訂單——"+orderNo);orderExp.setOrderStatus(UNPAY);orderExpDao.insertDelayOrder(orderExp,expire_duration);logger.info("保存訂單到DB:"+orderNo);//TODO 這里需要把訂單信息存入RocketMQdelayOrder.orderDelay(orderExp, expireTime);}}@PostConstructpublic void initDelayOrder() {logger.info("系統(tǒng)啟動(dòng),掃描表中過(guò)期未支付的訂單并處理.........");int counts = orderExpDao.updateExpireOrders();logger.info("系統(tǒng)啟動(dòng),處理了表中["+counts+"]個(gè)過(guò)期未支付的訂單!");List<OrderExp> orderList = orderExpDao.selectUnPayOrders();logger.info("系統(tǒng)啟動(dòng),發(fā)現(xiàn)了表中還有["+orderList.size()+"]個(gè)未到期未支付的訂單!推入檢查隊(duì)列準(zhǔn)備到期檢查....");for(OrderExp order:orderList) {long expireTime = order.getExpireTime().getTime()-(new Date().getTime());delayOrder.orderDelay(order, expireTime);}} }消息隊(duì)列的實(shí)現(xiàn)RocketMQProducer:
@Service @Qualifier("rocketmq") public class RocketMQProducer implements IDelayOrder {@Autowiredprivate DlyOrderProcessor processDelayOrder;private Thread takeOrder;private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);private DefaultMQProducer defaultMQProducer;private String producerGroup;private String namesrvAddr;@PostConstructpublic void init() throws MQClientException {this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);defaultMQProducer.setNamesrvAddr(this.namesrvAddr);defaultMQProducer.start();logger.info("rocketMQ初始化生產(chǎn)者完成[producerGroup:" + producerGroup + "]");}@PreDestroypublic void destroy() {defaultMQProducer.shutdown();logger.info("rocketMQ生產(chǎn)者[producerGroup: " + producerGroup + "]已停止");}public DefaultMQProducer getDefaultMQProducer() {return defaultMQProducer;}public void setProducerGroup(String producerGroup) {this.producerGroup = producerGroup;}public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}public void orderDelay(OrderExp order, long timeLevel) {try {//TODO 使用Gson序列化Gson gson = new Gson();String txtMsg = gson.toJson(order);//TODO 發(fā)送延時(shí)消息Message msg = new Message("TimeOrder", null, txtMsg.getBytes());//這個(gè)是設(shè)置延時(shí)消息的屬性//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ?18個(gè)等級(jí)msg.setDelayTimeLevel((int)timeLevel);SendResult result = defaultMQProducer.send(msg);if(result.getSendStatus() !=null && result.getSendStatus()== SendStatus.SEND_OK){System.out.println("訂單被推入延遲隊(duì)列,訂單詳情:"+order);logger.info("訂單被推入延遲隊(duì)列,訂單詳情:"+order);}else{logger.error("訂單推入RocketMq失敗,訂單詳情:"+order+"SendStatus:"+result.getSendStatus());}} catch (Exception e) {logger.error("單推入RocketMq失敗,失敗詳情:"+e.toString());}} }2)消息隊(duì)列會(huì)把延時(shí)的訂單發(fā)給消費(fèi)者M(jìn)essageListenerImpl,它是一個(gè)RocketMQ的消費(fèi)者監(jiān)聽(tīng),它來(lái)負(fù)責(zé)檢查訂單是否過(guò)期,有消息過(guò)來(lái),證明消息訂單過(guò)期了,則把訂單狀態(tài)修改為過(guò)期訂單。RocketMQ本身又如何保證可用性和伸縮性?這個(gè)就需要RocketMQ的主從同步(HA機(jī)制)。
處理消息隊(duì)列返回的延時(shí)訂單MessageListenerImpl:
@Service public class MessageListenerImpl implements MessageListenerConcurrently {private Logger logger = LoggerFactory.getLogger(MessageListenerImpl.class);@Autowiredprivate DlyOrderProcessor processDlyOrder;public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {//TODO 使用GSON反序列化String txtMsg = new String(msg.getBody());Gson gson = new Gson();System.out.println("接收到RocketMQ的消息:"+txtMsg);OrderExp order = (OrderExp)gson.fromJson(txtMsg, OrderExp.class);//TODO 修改訂單狀態(tài)為過(guò)期if(order.getId()!=null){processDlyOrder.checkDelayOrder(order);}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 如果沒(méi)有異常會(huì)認(rèn)為都成功消費(fèi)return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }處理過(guò)期訂單的服務(wù):
@Service public class DlyOrderProcessor {private Logger logger = LoggerFactory.getLogger(DlyOrderProcessor.class);@Autowiredprivate OrderExpDao orderExpDao;/*** 檢查數(shù)據(jù)庫(kù)中指定id的訂單的狀態(tài),如果為未支付,則修改為已過(guò)期* */public void checkDelayOrder(OrderExp record) {OrderExp dbOrder = orderExpDao.selectByPrimaryKey(record.getId());if(dbOrder.getOrderStatus()==SaveOrder.UNPAY) {logger.info("訂單【"+record+"】未支付已過(guò)期,需要更改為過(guò)期訂單!");orderExpDao.updateExpireOrder(record.getId());}else {logger.info("已支付訂單【"+record+"】,無(wú)需修改!");}} }?
總結(jié)
以上是生活随笔為你收集整理的第五章 限时订单实战笔记的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: cs224n第二讲:简单的词向量表示:w
- 下一篇: 使用Flickr的图片拼出你的句子