日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式消息通信ActiveMQ原理-消费消息策略-笔记

發布時間:2023/12/15 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式消息通信ActiveMQ原理-消费消息策略-笔记 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

消息消費流程圖

消費端消費消息的原理

  • 我們通過上一節課的講解,知道有兩種方法可以接收消息,
    • 一種是使用同步阻塞的MessageConsumer#receive方法。
    • 另一種是使用消息監聽器MessageListener。
  • 這里需要注意的是,在同一個session下,這兩者不能同時工作,
    • 也就是說不能針對不同消息采用不同的接收方式。
    • 否則會拋出異常。
  • 至于為什么這么做,最大的原因還是在事務性會話中,兩種消費模式的事務不好管控

ActiveMQMessageConsumer.receive

  • 消費端同步接收消息的源碼入口
public Message receive() throws JMSException {checkClosed();checkMessageListener(); //檢查receive和MessageListener是否同時配置在當前的會話中sendPullCommand(0); //如果PrefetchSizeSize為0并且unconsumerMessage為空,則發起pull命令MessageDispatch md = dequeue(-1); //從unconsumerMessage出隊列獲取消息if (md == null) {return null;}beforeMessageIsConsumed(md);afterMessageIsConsumed(md, false); //發送ack給到brokerreturn createActiveMQMessage(md);//獲取消息并返回}

sendPullCommand

  • 發送pull命令從broker上獲取消息,前提是prefetchSize=0并且unconsumedMessages為空。
  • unconsumedMessage表示未消費的消息,這里面預讀取的消息大小為prefetchSize的值
protected void sendPullCommand(long timeout) throws JMSException {clearDeliveredList();if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {MessagePull messagePull = new MessagePull();messagePull.configure(info);messagePull.setTimeout(timeout);session.asyncSendPacket(messagePull); //向服務端異步發送messagePull指令}}

clearDeliveredList

  • 在上面的sendPullCommand方法中,會先調用clearDeliveredList方法,
    • 主要用來清理已經分發的消息鏈表deliveredMessages
      • deliveredMessages,存儲分發給消費者但還為應答的消息鏈表
      • ? 如果session是事務的,則會遍歷deliveredMessage中的消息放入到previouslyDeliveredMessage中來做重發
      • ? 如果session是非事務的,根據ACK的模式來選擇不同的應答操作
private void clearDeliveredList() {if (clearDeliveredList) {synchronized (deliveredMessages) {if (clearDeliveredList) {if (!deliveredMessages.isEmpty()) {if (session.isTransacted()) {if (previouslyDeliveredMessages == null) {previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,Boolean>(session.getTransactionContext().getTransactionId());}for (MessageDispatch delivered : deliveredMessages) {previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(), false);}LOG.debug("{} tracking existing transacted {} delivered list ({}) on transport interrupt",getConsumerId(), previouslyDeliveredMessages.transactionId,deliveredMessages.size());} else {if (session.isClientAcknowledge()) {LOG.debug("{} rolling back delivered list ({}) on transport interrupt", getConsumerId(), deliveredMessages.size());// allow redeliveryif (!this.info.isBrowser()) {for (MessageDispatch md : deliveredMessages) {this.session.connection.rollbackDuplicate(this,md.getMessage());}}}LOG.debug("{} clearing delivered list ({}) on transport interrupt",getConsumerId(), deliveredMessages.size());deliveredMessages.clear();pendingAck = null;}}clearDeliveredList = false;}}}}

dequeue

  • 從unconsumedMessage中取出一個消息,
  • 在創建一個消費者時,就會未這個消費者創建一個為消費的消息通道,這個通道分為兩種,
    • 一種是簡單優先級隊列分發通道SimplePriorityMessageDispatchChannel ;
    • 另一種是先進先出的分發通道FifoMessageDispatchChannel.
  • 至于為什么要存在這樣一個消息分發通道,大家可以想象一下,
    • 如果消費者每次去消費完一個消息以后再去broker拿一個消息,效率是比較低的。
    • 所以通過這樣的設計可以允許session能夠一次性將多條消息分發給一個消費者。
    • 默認情況下對于queue來說,prefetchSize的值是1000

beforeMessageIsConsumed

  • 這里面主要是做消息消費之前的一些準備工作,
  • 如果ACK類型不是DUPS_OK_ACKNOWLEDGE或者隊列模式(簡單來說就是除了Topic和DupAck這兩種情況),
    • 所有的消息先放到deliveredMessages鏈表的開頭。
  • 并且如果當前是事務類型的會話,
    • 則判斷transactedIndividualAck,如果為true,表示單條消息直接返回ack。
    • 否則,調用ackLater,批量應答,
      • client端在消費消息后暫且不發送ACK,而是把它緩存下來(pendingACK),
        • 等到這些消息的條數達到一定閥值時,只需要通過一個ACK指令把它們全部確認;
        • 這比對每條消息都逐個確認,在性能上要提高很多
private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {md.setDeliverySequenceId(session.getNextDeliveryId());lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();if (!isAutoAcknowledgeBatch()) {synchronized(deliveredMessages) {deliveredMessages.addFirst(md);}if (session.getTransacted()) {if (transactedIndividualAck) {immediateIndividualTransactedAck(md);} else {ackLater(md, MessageAck.DELIVERED_ACK_TYPE);}}}}

afterMessageIsConsumed

  • 這個方法的主要作用是執行應答操作,這里面做以下幾個操作
    • ? 如果消息過期,則返回消息過期的ack
    • ? 如果是事務類型的會話,則不做任何處理
    • ? 如果是AUTOACK或者(DUPS_OK_ACK且是隊列),并且是優化ack操作,則走批量確認ack
    • ? 如果是DUPS_OK_ACK,則走ackLater邏輯
    • ? 如果是CLIENT_ACK,則執行ackLater
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throwsJMSException {if (unconsumedMessages.isClosed()) {return;}if (messageExpired) {acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);stats.getExpiredMessageCount().increment();} else {stats.onMessage();if (session.getTransacted()) {// Do nothing.} else if (isAutoAcknowledgeEach()) {if (deliveryingAcknowledgements.compareAndSet(false, true)) {synchronized (deliveredMessages) {if (!deliveredMessages.isEmpty()) {if (optimizeAcknowledge) {ackCounter++;// AMQ-3956 evaluate both expired and normal msgs as// otherwise consumer may get stalledif (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65)|| (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp +optimizeAcknowledgeTimeOut))) {MessageAck ack =makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);if (ack != null) {deliveredMessages.clear();ackCounter = 0;session.sendAck(ack);optimizeAckTimestamp = System.currentTimeMillis();}// AMQ-3956 - as further optimization send// ack for expired msgs when there are any.// This resets the deliveredCounter to 0 so that// we won't sent standard acks with every msg just// because the deliveredCounter just below// 0.5 * prefetch as used in ackLater()if (pendingAck != null && deliveredCounter > 0) {session.sendAck(pendingAck);pendingAck = null;deliveredCounter = 0;}}} else {MessageAck ack =makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);if (ack != null) {deliveredMessages.clear();session.sendAck(ack);}}}}deliveryingAcknowledgements.set(false);}} else if (isAutoAcknowledgeBatch()) {ackLater(md, MessageAck.STANDARD_ACK_TYPE);} else if (session.isClientAcknowledge() || session.isIndividualAcknowledge()) {boolean messageUnackedByConsumer = false;synchronized (deliveredMessages) {messageUnackedByConsumer = deliveredMessages.contains(md);}if (messageUnackedByConsumer) {ackLater(md, MessageAck.DELIVERED_ACK_TYPE);}} else {throw new IllegalStateException("Invalid session state.");}}}

?

轉載于:https://my.oschina.net/u/3847203/blog/2989560

總結

以上是生活随笔為你收集整理的分布式消息通信ActiveMQ原理-消费消息策略-笔记的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。