Kafka日志清理之Log Compaction
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/log-compaction-of-kafka-log-retention/
在上一篇文章《Kafka日志清理之Log Deletion》中介紹了日志清理的方式之一——日志刪除,本文承接上篇,主要來介紹Log Compaction。
Kafka中的Log Compaction是指在默認的日志刪除(Log Deletion)規則之外提供的一種清理過時數據的方式。如下圖所示,Log Compaction對于有相同key的的不同value值,只保留最后一個版本。如果應用只關心key對應的最新value值,可以開啟Kafka的日志清理功能,Kafka會定期將相同key的消息進行合并,只保留最新的value值。
(圖1)
有很多中文資料會把Log Compaction翻譯為“日志壓縮”,筆者認為不夠妥帖,壓縮應該是指Compression,在Kafka中消息可以采用GZip、Snappy、LZ4等壓縮方式進行壓縮,如果把Log Compaction翻譯為日志壓縮,容易讓人和消息壓縮(Message Compression)產生關聯,而實則是兩個不同的概念。英文“Compaction”可以直譯為“壓緊、壓實”,如果這里將Log Compaction直譯為“日志壓緊”或者“日志壓實”又未免太過生硬??紤]到“日志壓縮”的說法已經廣為接受,筆者這里勉強接受此種說法,不過文中盡量直接使用英文Log Compaction來表示日志壓縮。讀者在遇到類似“壓縮”的字眼之時需格外注意這個壓縮是具體指日志壓縮(Log Compaction)還是指消息壓縮(Message Compression)。
Log Compaction執行前后,日志分段中的每條消息的偏移量和寫入時的保持一致。Log Compaction會生成新的日志分段文件,日志分段中每條消息的物理位置會重新按照新文件來組織。Log Compaction執行過后的偏移量不再是連續的,不過這并不影響日志的查詢。
Kafka的Log Compaction可以類比于Redis的SNAPSHOTTING的持久化模式。試想一下,如果一個系統使用Kafka來保存狀態,每次有狀態變更都會將其寫入Kafka中。在某一時刻此系統異常崩潰,進而在恢復時通過讀取Kafka中的消息來恢復其應有的狀態,那么此系統關心的是它原本的最新狀態而不是歷史時刻中的每一個狀態。如果Kafka的日志保存策略是日志刪除(Log Deletion),那么系統勢必要一股腦的讀取Kafka中的所有數據來恢復,而如果日志保存策略是Log Compaction,那么可以減少數據的加載量進而加快系統的恢復速度。Log Compaction在某些應用場景下可以簡化技術棧,提高系統整體的質量。
我們知道可以通過配置log.dir或者log.dirs參數來設置Kafka日志的存放目錄,而對于每一個日志目錄下都有一個名為“cleaner-offset-checkpoint”的文件,這個文件就是清理檢查點文件,用來記錄每個主題的每個分區中已清理的偏移量。通過清理檢查點文件可以將日志文件(Log)分成兩個部分,參考下圖,通過檢查點cleaner checkpoint來劃分出一個已經清理過的clean部分和一個還未清理過的dirty部分。在日志清理的同時,客戶端也會讀取日志。dirty部分的消息偏移量是逐一遞增的,而clean部分的消息偏移量是斷續的,如果客戶端總能趕上dirty部分,它就能讀取到日志的所有消息,反之,就不可能讀到全部的消息。
(圖2)
上圖中firstDirtyOffset(與cleaner checkpoint相等)表示dirty部分的起始偏移量,而firstUncleanableOffset為dirty部分的截止偏移量,整個dirty部分的偏移量范圍為[firstDirtyOffset, firstUncleanableOffset),注意這里是左閉右開區間。為了避免當前活躍的日志分段activeSegment成為熱點文件,activeSegment不會參與Log Compaction的操作。同時Kafka支持通過參數log.cleaner.min.compaction.lag.ms(默認值為0)來配置消息在被清理前的最小保留時間,默認情況下firstUncleanableOffset等于activeSegment的baseOffset。
注意Log Compaction是針對key的,所以在使用時應注意每個消息的key值不為null。每個broker會啟動log.cleaner.thread(默認值為1)個日志清理線程負責執行清理任務,這些線程會選擇“污濁率”最高的日志文件進行清理。用cleanBytes表示clean部分的日志占用大小,dirtyBytes表示dirty部分的日志占用大小,那么這個日志的污濁率(dirtyRatio)為:
dirtyRatio = dirtyBytes / (cleanBytes + dirtyBytes)為了防止日志不必要的頻繁清理操作,Kafka還使用了參數log.cleaner.min.cleanable.ratio(默認值為0.5)來限定可進行清理操作的最小污濁率。
Kafka中用于保存消費者消費位移的主題“__consumer_offsets”使用的就是Log Compaction策略。
這里我們已經知道了怎樣選擇合適的日志文件做清理操作,然而我們怎么對日志文件中消息的key進行篩選操作呢?Kafka中的每個日志清理線程會使用一個名為“SkimpyOffsetMap”的對象來構建key與offset的映射關系的哈希表。日志清理需要遍歷兩次日志文件,第一次遍歷把每個key的哈希值和最后出現的offset都保存在SkimpyOffsetMap中,映射模型如下圖所示。第二次遍歷檢查每個消息是否符合保留條件,如果符合就保留下來,否則就會被清理掉。假設一條消息的offset為O1,這條消息的key在SkimpyOffsetMap中所對應的offset為O2,如果O1>=O2即為滿足保留條件。
(圖3)
默認情況下SkimpyOffsetMap使用MD5來計算key的哈希值,占用空間大小為16B,根據這個哈希值來從SkimpyOffsetMap中找到對應的槽位,如果發生沖突則用線性探測法處理。為了防止哈希沖突過于頻繁,我們也可以通過broker端參數log.cleaner.io.buffer.load.factor(默認值為0.9)來調整負載因子。偏移量占用空間大小為8B,故一個映射項占用大小為24B。每個日志清理線程的SkimpyOffsetMap的內存占用大小為log.cleaner.dedupe.buffer.size / log.cleaner.thread,默認值為 = 128MB/1 = 128MB。所以默認情況下SkimpyOffsetMap可以保存128MB * 0.9 /24B ≈ 5033164個key的記錄。假設每條消息的大小為1KB,那么這個SkimpyOffsetMap可以用來映射4.8GB的日志文件,而如果有重復的key,那么這個數值還會增大,整體上來說SkimpyOffsetMap極大的節省了內存空間且非常高效。
“SkimpyOffsetMap”這個取名也很有意思,“Skimpy”可以直譯為“不足的”,可以看出它最初的設計者也認為這種實現不夠嚴謹。如果遇到兩個不同的key但哈希值相同的情況,那么其中一個key所對應的消息就會丟失。雖然說MD5這類摘要算法的沖突概率非常小,但根據墨菲定律,任何一個事件,只要具有大于0的幾率,就不能假設它不會發生,所以在使用Log Compaction策略時要注意這一點。
Log Compaction會為我們保留key相應的最新value值,那么當我們需要刪除一個key怎么辦?Kafka中提供了一個墓碑消息(tombstone)的概念,如果一條消息的key不為null,但是其value為null,那么此消息就是墓碑消息。日志清理線程發現墓碑消息時會先進行常規的清理,并保留墓碑消息一段時間。墓碑消息的保留條件是當前墓碑消息所在的日志分段的最近修改時間lastModifiedTime大于deleteHorizonMs,參考圖2,這個deleteHorizonMs的計算方式為clean部分中最后一個日志分段的最近修改時間減去保留閾值deleteRetionMs(通過broker端參數log.cleaner.delete.retention.ms配置,默認值為86400000,即24小時)的大小,即:
deleteHorizonMs = clean部分中最后一個LogSegment的lastModifiedTime - deleteRetionMs所以墓碑消息的保留條件為:
所在LogSegment的lastModifiedTime > deleteHorizonMs => 所在LogSegment的lastModifiedTime > clean部分中最后一個LogSegment的lastModifiedTime - deleteRetionMs => 所在LogSegment的lastModifiedTime + deleteRetionMs > clean部分中最后一個LogSegment的lastModifiedTime (可以對照圖2中的deleteRetionMs所標記的位置去理解)Log Compaction執行過后的日志分段的大小會比原先的日志分段的要小,為了防止出現太多的小文件,Kafka在實際清理過程中并不對單個的日志分段進行單獨清理,而是會將日志文件中offset從0至firstUncleanableOffset的所有日志分段進行分組,每個日志分段只屬于一組,分組策略為:按照日志分段的順序遍歷,每組中日志分段的占用空間大小之和不超過segmentSize(可以通過broker端參數log.segments.bytes設置,默認值為1GB),且對應的索引文件占用大小之和不超過maxIndexSize(可以通過broker端參數log.index.interval.bytes設置,默認值為10MB)。同一個組的多個日志分段清理過后,只會生成一個新的日志分段。
(圖4)
參考上圖,假設所有的參數配置都為默認值,在Log Compaction之前checkpoint的初始值為0。執行第一次Log Compaction之后,每個非活躍的日志分段的大小都有所縮減,checkpoint的值也有所變化。執行第二次Log Compaction時會將組隊成[0.4GB, 0.4GB]、[0.3GB, 0.7GB]、[0.3GB]、[1GB]這4個分組,并且從第二次Log Compaction開始還會涉及墓碑消息的清除。同理,第三次Log Compaction過后的情形可參考上圖尾部。Log Compaction過程中會將對每個日志分組中需要保留的消息拷貝到一個以“.clean”為后綴的臨時文件中,此臨時文件以當前日志分組中第一個日志分段的文件名命名,例如:00000000000000000000.log.clean。Log Compaction過后將“.clean”的文件修改為以“.swap”后綴的文件,例如:00000000000000000000.log.swap,然后刪除掉原本的日志文件,最后才把文件的“.swap”后綴去掉,整個過程中的索引文件的變換也是如此,至此一個完整Log Compaction操作才算完成。
以上是整個日志壓縮(Log Compaction)過程的詳解,讀者需要注意將日志壓縮和日志刪除區分開,日志刪除是指清除整個日志分段,而日志壓縮是針對相同key的消息的合并清理。
歡迎跳轉到本文的原文鏈接:https://honeypps.com/mq/log-compaction-of-kafka-log-retention/
歡迎支持筆者新作:《深入理解Kafka:核心設計與實踐原理》和《RabbitMQ實戰指南》,同時歡迎關注筆者的微信公眾號:朱小廝的博客。
總結
以上是生活随笔為你收集整理的Kafka日志清理之Log Compaction的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka日志清理之Log Deleti
- 下一篇: Kafka参数broker.id详解