日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

为什么QueueingConsumer会被Deprecated?

發布時間:2024/4/11 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 为什么QueueingConsumer会被Deprecated? 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。

歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/why-is-queueingconsumer-marked-deprecated/


QueueingConsumer在Rabbitmq客戶端3.x版本中用的如火如荼,但是在4.x版本開初就被標記為@Deprecated,這是為什么呢?本文就此展開探討。

在我的博文《RabbitMQ之Consumer消費模式(Push & Pull)》中講到,Consumer的消費模式有Pull 和 Push兩種,而經常用到的就是Push模式,Push模式在3.x的用法demo如下:

QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(1); channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [X] Received '" + message + "'");channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);break; }

在官方文檔中推薦使用繼承DefaultConsumer的方式:

boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{String routingKey = envelope.getRoutingKey();String contentType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();// (process the message components here ...)channel.basicAck(deliveryTag, false);} });

在源碼注釋中有關QueueingConsumer的介紹有這樣一段:

QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers ware made on the Connection’s thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.

QueuingConsumer provided client code with an easy way to obviate the problem by queueing incoming messages and processing them on a separate, application-managed thread.

The threading behaviour of Connection and Channel has been changed so that each Channel uses a distinct thread for dispatching to Consumers. This prevents Consumers on one Channel holding up Consumers on another and it also prevents recursive calls from deadlocking the client.

As such, it is now safe to implement Consumer directly of to extend DefaultConsumer and QueueingConsumer is a lot less relevant.

上面提及了兩個drawbacks:

  • the Consumer could stall the processing of all Channels on the Connection. =>QueueingConsumer會拖累Connection的所有Channels的操作
  • if a Consumer made a recursive synchronous call into its Channel the Client would deadlock.=>同步遞歸調用時會產生死鎖
  • 對于這兩句簡單的言辭,博主沒有停下追求真理的腳步,既而去github上發問,當我咨詢rabbitmq-java-client的作者時(issue @265),他是這么回復的:

    Search rabbitmq-users archives. That consumer implementation was merely a workaround for the consumer operation dispatch deficiency that no longer exists. It has significant limitations in that automatic connection recovery does not support it and when deliveries happen faster than consumers actually process them, its internal j.u.c. queue data structure can grow very large. It has been deprecated for years prior to the removal.

    上面提及的rabbitmq-users的鏈接是:https://groups.google.com/forum/#!forum/rabbitmq-users,當然在我大天朝是訪問不了的。博主的翻墻軟件也失效了,就沒法search,有興趣的小伙伴search到的話麻煩告知下(下方留言,私信,或者留下你的資料地址~)。不過作者也提交了兩點:1. automatic connection recovery不支持QueueingConsumer的這種形式;2. 內存溢出問題。

    對于QueueingConsumer的內存溢出問題,我在博文《[九]RabbitMQ-客戶端源碼之Consumer》中講到QueueingConsumer內部其實是一個LinkBlockingQueue,它將從broker端接受到的信息先暫存到這個LinkBlockingQueue中,然后消費端程序在從這個LinkBlockingQueue中take出消息。試下一下,如果我們不take消息或者說take的非常慢,那么LinkBlockingQueue中的消息就會越來越多,最終造成內存溢出。

    這里我們來看一段英文介紹: You have a queue in RabbitMQ. You have some clients consuming from that queue. If you don’t set a Qos setting at all (Basic.Qos), then RabbitMQ will push all the queue’s messages to the client as fast as the network and the clients will allow. 也就是說,如果由于某些原因,隊列中堆積了比較多的消息,就可能導致Consumer內存溢出卡死,于是發生惡性循環,隊列消息不斷堆積得不到消化,徹底地悲劇了。其實這個問題可以通過設置Basic.Qos來很好的解決。

    博主這里實驗了下,先往一個queue里發送200MB+的消息,然后進行消費:

    QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, false, consumer);

    在client端發送Basic.Consume幀,并設置回調函數為QueueingConsumer之后,并不真正消費QueueingConsumer中的LinkedBlockingQueue中的內容,通過JVisualVM可以看到堆內存的變化,如下圖所示:

    可以看到堆內存一直在增加,博主這里只測試了發送200+MB的消息,如果發送的更多,那么這個堆會變得更大,直到OutOfMemory。

    在stackoverflow上也有關于QueueingConsumer的疑問,有人說QueueingConsumer不是event-driven的,也有人提及了內存溢出的問題。看來QueueingConsumer的毛病真的不少,都推薦使用繼承DefaultConsumer的方式進行消費。

    如果博主后面搜集到更多的證據,也會在本博文中更新相關的內容。
    如果你有相關的資料,也可以留言分享一下,大家互相促進~~


    參考資料

  • RabbitMQ之Consumer消費模式(Push & Pull)
  • [九]RabbitMQ-客戶端源碼之Consumer
  • RabbitMQ-api-guide
  • 解決RabbitMQ隊列超長QueueingConsumer導致JVM內存溢出的問題
  • 歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/why-is-queueingconsumer-marked-deprecated/


    歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。


    總結

    以上是生活随笔為你收集整理的为什么QueueingConsumer会被Deprecated?的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。