漫游Kafka实现篇之消息和日志
原文地址:http://blog.csdn.net/honglei915/article/details/37760631
Kafka視頻教程同步首發,歡迎觀看!
消息格式
日志
一個叫做“my_topic”且有兩個分區的的topic,它的日志有兩個文件夾組成,my_topic_0和my_topic_1,每個文件夾里放著具體的數據文件,每個數據文件都是一系列的日志實體,每個日志實體有一個4個字節的整數N標注消息的長度,后邊跟著N個字節的消息。每個消息都可以由一個64位的整數offset標注,offset標注了這條消息在發送到這個分區的消息流中的起始位置。每個日志文件的名稱都是這個文件第一條日志的offset.所以第一個日志文件的名字就是00000000000.kafka.所以每相鄰的兩個文件名字的差就是一個數字S,S差不多就是配置文件中指定的日志文件的最大容量。
消息的格式都由一個統一的接口維護,所以消息可以在producer,broker和consumer之間無縫的傳遞。存儲在硬盤上的消息格式如下所示:
消息長度: 4 bytes (value: 1+4+n) 版本號: 1 byte CRC校驗碼: 4 bytes 具體的消息: n bytes
寫操作
消息被不斷的追加到最后一個日志的末尾,當日志的大小達到一個指定的值時就會產生一個新的文件。對于寫操作有兩個參數,一個規定了消息的數量達到這個值時必須將數據刷新到硬盤上,另外一個規定了刷新到硬盤的時間間隔,這對數據的持久性是個保證,在系統崩潰的時候只會丟失一定數量的消息或者一個時間段的消息。
讀操作
讀操作需要兩個參數:一個64位的offset和一個S字節的最大讀取量。S通常比單個消息的大小要大,但在一些個別消息比較大的情況下,S會小于單個消息的大小。這種情況下讀操作會不斷重試,每次重試都會將讀取量加倍,直到讀取到一個完整的消息。可以配置單個消息的最大值,這樣服務器就會拒絕大小超過這個值的消息。也可以給客戶端指定一個嘗試讀取的最大上限,避免為了讀到一個完整的消息而無限次的重試。
在實際執行讀取操縱時,首先需要定位數據所在的日志文件,然后根據offset計算出在這個日志中的offset(前面的的offset是整個分區的offset),然后在這個offset的位置進行讀取。定位操作是由二分查找法完成的,Kafka在內存中為每個文件維護了offset的范圍。
下面是發送給consumer的結果的格式:
刪除
日志管理器允許定制刪除策略。目前的策略是刪除修改時間在N天之前的日志(按時間刪除),也可以使用另外一個策略:保留最后的N GB數據的策略(按大小刪除)。為了避免在刪除時阻塞讀操作,采用了copy-on-write形式的實現,刪除操作進行時,讀取操作的二分查找功能實際是在一個靜態的快照副本上進行的,這類似于Java的CopyOnWriteArrayList。
可靠性保證
日志文件有一個可配置的參數M,緩存超過這個數量的消息將被強行刷新到硬盤。一個日志矯正線程將循環檢查最新的日志文件中的消息確認每個消息都是合法的。合法的標準為:所有文件的大小的和最大的offset小于日志文件的大小,并且消息的CRC32校驗碼與存儲在消息實體中的校驗碼一致。如果在某個offset發現不合法的消息,從這個offset到下一個合法的offset之間的內容將被移除。
有兩種情況必須考慮:1,當發生崩潰時有些數據塊未能寫入。2,寫入了一些空白數據塊。第二種情況的原因是,對于每個文件,操作系統都有一個inode(inode是指在許多“類Unix文件系統”中的一種數據結構。每個inode保存了文件系統中的一個文件系統對象,包括文件、目錄、大小、設備文件、socket、管道, 等等),但無法保證更新inode和寫入數據的順序,當inode保存的大小信息被更新了,但寫入數據時發生了崩潰,就產生了空白數據塊。CRC校驗碼可以檢查這些塊并移除,當然因為崩潰而未寫入的數據塊也就丟失了。
總結
以上是生活随笔為你收集整理的漫游Kafka实现篇之消息和日志的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 漫游Kafka实战篇之客户端编程实例
- 下一篇: 漫游Kafka实现篇之分布式