日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

跟Kafka学技术-缓冲池的使用

發布時間:2024/4/11 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 跟Kafka学技术-缓冲池的使用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

點擊上方“朱小廝的博客”,選擇“設為星標”

后臺回復”加群“獲取公眾號專屬群聊入口

作者簡介:黃益明,來自滴滴出行kafka團隊,對Kafka有一年多的研究和實踐,負責滴滴內部云平臺的架構設計和Kafka特性研發工作。

大家都知道Kafka是一個高吞吐的消息隊列,是大數據場景首選的消息隊列,這種場景就意味著發送單位時間消息的量會特別的大,那么Kafka如何做到能支持能同時發送大量消息的呢?

答案是Kafka通過批量壓縮和發送做到的。我們知道消息肯定是放在內存中的,大數據場景消息的不斷發送,內存中不斷存在大量的消息,很容易引起GC,頻繁的GC特別是full gc是會造成“stop the world”,也就是其他線程停止工作等待垃圾回收線程執行,繼而進一步影響發送的速度影響吞吐量,那么Kafka是如何做到優化JVM的GC問題的呢?看完本篇文章你會get到。

Kafka的內存池

下面介紹下Kafka客戶端發送的大致過程,如下圖:

Kafka的kafkaProducer對象是線程安全的,每個發送線程在發送消息時候共用一個kafkaProducer對象來調用發送方法,最后發送的數據根據Topic和分區的不同被組裝進某一個RecordBatch中。發送的數據放入RecordBatch后會被發送線程批量取出組裝成ProduceRequest對象發送給Kafka服務端。可以看到發送數據線程和取數據線程都要跟內存中的RecordBatch打交道,RecordBatch是存儲數據的對象,那么RecordBatch是怎么分配的呢?下面我們看下Kafka的緩沖池結構,如下圖所示:

名詞解釋緩沖池:BufferPool(緩沖池)對象,整個KafkaProducer實例中只有一個BufferPool對象。內存池總大小,它是已使用空間和可使用空間的總和,用totalMemory表示(由buffer.memory配置,默認32M)。

可使用的空間:它包含包括兩個部分,綠色部分代表未申請未使用的部分,用availableMemory表示;黃色部分代表已經申請但沒有使用的部分,用一個ByteBuffer雙端隊列(Deque)表示,在BufferPool中這個隊列叫free,隊列中的每個ByteBuffer的大小用poolableSize表示(由batch.size配置,默認16k),因為每次free申請內存都是以poolableSize為單位申請的,申請poolableSize大小的bytebuffer后用RecordBatch來包裝起來。

已使用空間:代表緩沖池中已經裝了數據的部分。

根據以上介紹,我們可以知道,總的BufferPool大小=已使用空間+可使用空間;free的大小=free.size * poolableSize(poolsize就是單位batch的size)。

數據的分配過程 總的來說是判斷需要存儲的數據的大小是否free里有合適的recordBatch裝得下,如果裝得下則用recordBatch來存儲數據,如果free里沒有空間但是availableMemory+free的大小比需要存儲的數據大(也就是說可使用空間比實際需要申請的空間大),說明可使用空間大小足夠,則會用讓free一直釋放byteBuffer空間直到有空間裝得下要存儲的數據位置,如果需要申請的空間比實際可使用空間大,則內存申請會阻塞直到申請到足夠的內存為止。整個申請過程如下圖:

數據的釋放過程 總的來說有2個入口,釋放過程如下圖:

再來看段申請空間代碼:

//判斷需要申請空間大小,如果需要申請空間大小比batchSize小,那么申請大小就是batchsize,如果比batchSize大,那么大小以實際申請大小為準 int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); //這個過程可以參考圖3 ByteBuffer buffer = free.allocate(size, maxTimeToBlock);

再來段回收的核心代碼:

public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {//只有標準規格(bytebuffer空間大小和poolableSize大小一致的才放入free)if (size == this.poolableSize && size == buffer.capacity()) {//注意這里的buffer是直接reset了,重新reset后可以重復利用,沒有gc問題buffer.clear();//添加進free循環利用this.free.add(buffer);} else {//規格不是poolableSize大小的那么沒有進行重制,但是會把availableMemory增加,代表整個可用內存空間增加了,這個時候buffer的回收依賴jvm的gcthis.availableMemory += size;}//喚醒排在前面的等待線程Condition moreMem = this.waiters.peekFirst();if (moreMem != null)moreMem.signal();} finally {lock.unlock();} }

通過申請和釋放過程流程圖以及釋放空間代碼,我們可以得到一個結論,就是如果用戶申請的數據(發送的消息)大小都是在poolableSize(由batch.size配置,默認16k)以內,并且申請時候free里有空間,那么用戶申請的空間是可以循環利用的空間,可以減少gc,但是其他情況也可能存在直接用堆內存申請空間的情況,存在gc的情況。如何盡量避免呢,如果批量消息里面單個消息都是超過16k,可以考慮調整batchSize大小。

如果沒有使用緩沖池,那么用戶發送的模型是下圖5,由于GC特別是Full GC的存在,如果大量發送,就可能會發生頻繁的垃圾回收,導致的工作線程的停頓,會對整個發送性能,吞吐量延遲等都有影響。

使用緩沖池后,整個使用過程可以縮略為下圖:

總結

Kafka通過使用內存緩沖池的設計,讓整個發送過程中的存儲空間循環利用,有效減少JVM GC造成的影響,從而提高發送性能,提升吞吐量。

想知道更多?描下面的二維碼關注我

后臺回復”加群“獲取公眾號專屬群聊入口

【精彩推薦】

  • 一文講透微服務下如何保證事務的一致性

  • 如何理解Linux中的零拷貝技術

  • 干貨!Java字節碼增強探秘

  • Java Agent初探

  • IO多路復用是什么意思

  • 當我們在談論內存的時候,我們在談論什么 | 干貨

  • 分布式文件系統設計,該從哪些方面考慮

  • 咱們從頭到尾說一次Java垃圾回收

  • Netty、Kafka中的零拷貝技術到底有多牛?

  • JDK14的9大重磅特性

朕已閱?

總結

以上是生活随笔為你收集整理的跟Kafka学技术-缓冲池的使用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。