日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka的消息格式

發(fā)布時(shí)間:2025/4/16 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka的消息格式 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Commit Log

Kafka儲(chǔ)存消息的文件被它叫做log,按照Kafka文檔的說法是:

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log

這反應(yīng)出來的Kafka的行為是:消息被不斷地append到文件末尾,而且消息是不可變的。

這種行為源于Kafka想要實(shí)現(xiàn)的功能:高吞吐量,多副本,消息持久化。這種簡(jiǎn)單的log形式的文件結(jié)構(gòu)能夠更好地實(shí)現(xiàn)這些功能,不過也會(huì)在其它方面有所欠缺,比如檢索消息的能力。

而Kafka的行為也決定了它的消息格式。對(duì)于Kafka來說,消息的主體部分的格式在網(wǎng)絡(luò)傳輸中和磁盤上是一致的,也就是說消息的主體部分可以直接從網(wǎng)絡(luò)讀取的字節(jié)buffer中寫入到文件(部分情況下),也可以直接從文件中copy到網(wǎng)絡(luò),而不需要在程序中再加工,這有利于降低服務(wù)器端的開銷,以及提高IO速度(比如使用zero-copy的傳輸)。

這也就決定了Kafka的消息格式必須是適于被直接append到文件中的。當(dāng)然啥都可以append到文件后面,問題在于怎么從文件中拆分出來一條條記錄。

記錄的劃分以及消息的格式

對(duì)于日志來說,一條記錄以"\n"結(jié)尾,或者通過其它特定的分隔符分隔,這樣就可以從文件中拆分出一條一條的記錄,不過這種格式更適用于文本,對(duì)于Kafka來說,需要的是二進(jìn)制的格式。所以,Kafka使用了另一種經(jīng)典的格式:在消息前面固定長(zhǎng)度的幾個(gè)字節(jié)記錄下這條消息的大小(以byte記),所以Kafka的記錄格式變成了:

Offset MessageSize Message

消息被以這樣格式append到文件里,在讀的時(shí)候通過MessageSize可以確定一條消息的邊界。

需要注意的是,在Kafka的文檔以及源碼中,消息(Message)并不包括它的offset。Kafka的log是由一條一條的記錄構(gòu)成的,Kafka并沒有給這種記錄起個(gè)專門的名字,但是需要記住的是這個(gè)“記錄”并不等于"Message"。Offset MessageSize Message加在一起,構(gòu)成一條記錄。而在Kafka Protocol中,Message具體的格式為

Message => Crc MagicByte Attributes Key Value ??Crc => int32 ??MagicByte => int8 ??Attributes => int8 ??Key => bytes ??Value => bytes

各個(gè)部分的含義是

Field

Description

Attributes

This byte holds metadata attributes about the message. The lowest 2 bits contain the compression codec used for the message. The other bits should be set to 0.

Crc

The CRC is the CRC32 of the remainder of the message bytes. This is used to check the integrity of the message on the broker and consumer.

Key

The key is an optional message key that was used for partition assignment. The key can be null.

MagicByte

This is a version id used to allow backwards compatible evolution of the message binary format. The current value is 0.

Offset

This is the offset used in kafka as the log sequence number. When the producer is sending messages it doesn't actually know the offset and can fill in any value here it likes.

Value

The value is the actual message contents as an opaque byte array. Kafka supports recursive messages in which case this may itself contain a message set. The message can be null.

?

MessageSet

之所以要強(qiáng)調(diào)記錄與Message的區(qū)別,是為了更好地理解MessageSet的概念。Kafka protocol里對(duì)于MessageSet的定義是這樣的

MessageSet => [Offset MessageSize Message] ??Offset => int64 ??MessageSize => int32

也就是說MessageSet是由多條記錄組成的,而不是消息,這就決定了一個(gè)MessageSet實(shí)際上不需要借助其它信息就可以從它對(duì)應(yīng)的字節(jié)流中切分出消息,而這決定了更重要的性質(zhì):Kafka的壓縮是以MessageSet為單位的。而以MessageSet為單位壓縮,決定了對(duì)于壓縮后的MessageSet,不需要在它的外部記錄這個(gè)MessageSet的結(jié)構(gòu),也就決定了Kafka的消息是可以遞歸包含的,也就是前邊"value"字段的說明“Kafka supports recursive messages in which case this may itself contain a message set"。

具體地說,對(duì)于Kafka來說,可以對(duì)一個(gè)MessageSet做為整體壓縮,把壓縮后得到的字節(jié)數(shù)組作為一條Message的value。于是,Message既可以表示未壓縮的單條消息,也可以表示壓縮后的MessageSet。

壓縮后的消息的讀取

就看Message頭部的Attributes里的壓縮格式標(biāo)識(shí)。說到這個(gè),得說下遞歸包含的事情,理論上,一個(gè)壓縮的的MessageSet里的一個(gè)Message可能會(huì)是另一個(gè)壓縮后的MessageSet,或者包含更深層的MessageSet。但是實(shí)際上,Kafka中的一個(gè)Message最多只含有一個(gè)MessageSet。從Message中讀取MessageSet的邏輯,可以在ByteBufferMessageSet的internalIterator方法中找到:

if(isShallow) { //是否要進(jìn)行深層迭代new MessageAndOffset(newMessage, offset)} else { //如果要深層迭代的話newMessage.compressionCodec match {case NoCompressionCodec =>innerIter = nullnew MessageAndOffset(newMessage, offset) //如果這個(gè)Message沒有壓縮,就直接把它作為一個(gè)Message返回case _ =>innerIter = ByteBufferMessageSet.deepIterator(newMessage) //如果這個(gè)Message采用了壓縮,就對(duì)它進(jìn)行深層迭代if(!innerIter.hasNext)innerIter = nullmakeNext()}}

而ByteBufferMessageSet的deepIterator方法就是對(duì)這個(gè)Message的value進(jìn)行解壓,然后從中按照Offset MessageSize Message的格式讀取一條條記錄,對(duì)于這次讀取的Message,就不再進(jìn)行深層迭代了。下面是deepIterator的makeNext方法,它被不斷調(diào)用以生成迭代器的元素

override def makeNext(): MessageAndOffset = {try {// read the offsetval offset = compressed.readLong()// read record sizeval size = compressed.readInt()if (size < Message.MinHeaderSize)throw new InvalidMessageException("Message found with corrupt size (" + size + ") in deep iterator")// read the record into an intermediate record buffer// and hence has to do extra copyval bufferArray = new Array[Byte](size)compressed.readFully(bufferArray, 0, size)val buffer = ByteBuffer.wrap(bufferArray)val newMessage = new Message(buffer)// the decompressed message should not be a wrapper message since we do not allow nested compressionnew MessageAndOffset(newMessage, offset)} catch {case eofe: EOFException =>compressed.close()allDone()case ioe: IOException =>throw new KafkaException(ioe)}}

KAFKA-1718

至于一個(gè)MessageSet中不能包含多個(gè)壓縮后的Message(壓縮后的Message也就是以壓縮后的MessageSet作為value的Message),Kafka Protocol中是這么說的

The outer MessageSet should contain only one compressed "Message" (see?KAFKA-1718?for details).

KAFKA-1718就是在Protocol里添加這么一個(gè)特殊說明的原因。事情是這樣的:

報(bào)各這個(gè)問題的人是Go語言client的作者,他發(fā)現(xiàn)自己發(fā)的Message明顯沒有過大,但是發(fā)生了MessageSizeTooLargeException。后來跟其它人討論,發(fā)現(xiàn)是因?yàn)閎roker端在調(diào)用Log.append時(shí),會(huì)把傳送給這個(gè)方法的MessageSet解壓開,然后再組合成一個(gè)壓縮后的MessageSet(ByteBufferMessageSet)。而Go語言的客戶端發(fā)送的MessageSet中包含了多個(gè)壓縮后的Message,這樣即使發(fā)送時(shí)的Message不會(huì)超過message.max.bytes的限制,但是broker端再次生成的Message就超過了這個(gè)限制。所以,Kafka Protocol對(duì)這種情況做了特殊說明:The outer MessageSet should contain only one compressed "Message"。

Compressed Message的offset

即然可以把壓縮后的MessageSet作為Message的value,那么這個(gè)Message的offset該如何設(shè)置呢?

這個(gè)offset的值只有兩種可能:1, 被壓縮的MessageSet里Message的最大offset; 2, 被壓縮的MessageSet里Message的最小offset.

這兩種取值沒有功能的不同,只有效率的不同。

由于FetchRequest協(xié)議中的offset是要求broker提供大于等于這個(gè)offset的消息,因此broker會(huì)檢查log,找到符合條件的,然后傳輸出去。那么由于FetchRequest中的offset位置的消息可位于一個(gè)compressed message中,所以broker需要確定一個(gè)compressed Message是否需要被包含在respone中。

  • 如果compressed Message的offset是它包含的MessageSet的最小offset。那么,我們對(duì)于這個(gè)Message是否應(yīng)包含在response中,無法給出"是”或"否“的回答。比如FetchRequest中指明的開始讀取的offset是14,而一個(gè)compressed Message的offset是13,那么這個(gè)Message中可能包含offset為14的消息,也可能不包含。
  • 如果compressed Message的offset是它包含的MessageSet的最大offset,那么,可以根據(jù)這個(gè)offset確定這個(gè)Message“不應(yīng)該”包含在response中。比如FetchRequest中指明的開始讀取的offset是14,那么如果一個(gè)compressed Message的offset是13,那它就不該被包含在response中。而當(dāng)我們順序排除這種不符合條件的Message,就可以找到第一個(gè)應(yīng)該被包含在response中的Message(壓縮或者未壓縮), 從它開始讀取。

在第一種情況下(最小offset),我們盡管可以通過連續(xù)的兩個(gè)Message確定第一個(gè)Message的offset范圍,但是這樣在讀取時(shí)需要在讀取第二個(gè)Message的offset之后跳回到第一個(gè)Message, ?這通常會(huì)使得最近一次讀(也就讀第二個(gè)offset)的文件系統(tǒng)的緩存失效。而且邏輯比第二種情況更復(fù)雜。在第二種情況下,broker只需要找到第一個(gè)其offset大于或等于目標(biāo)offset的Message,從它可以讀取即可,而且也通常能利用到文件系統(tǒng)緩存,因?yàn)閛ffset和消息內(nèi)容有可能在同一個(gè)緩存塊中。

在處理FetchRequest時(shí),broker的邏輯也正是如此。對(duì)FetchRequest的處理會(huì)調(diào)用到Log#read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None)方法,然后調(diào)用到LogSegment的read方法,它的之后的調(diào)用有很多,所有不貼代碼了,它的注釋說明了讀取的邏輯

* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified

即,返回的MessageSet的第一條Message的offset >= startOffset。

而在broker給compressed Message賦予offset時(shí),其邏輯也是賦予其包含的messages中的最大offset。這段邏輯在ByteBufferMessageSet的create方法中:

messageWriter.write(codec = compressionCodec) { outputStream =>val output = new DataOutputStream(CompressionFactory(compressionCodec, outputStream)) //創(chuàng)建壓縮流try {for (message <- messages) {offset = offsetCounter.getAndIncrement //offsetCounter是一個(gè)AtomicLong,使用它的當(dāng)前值作為這條Message的offset,然后+1作為下一條消息的offsetoutput.writeLong(offset)//寫入這條日志記錄的offsetoutput.writeInt(message.size)//寫入這條日志記錄的大小output.write(message.buffer.array, message.buffer.arrayOffset, message.buffer.limit) //寫入這條記錄的Message }} finally {output.close()}}val buffer = ByteBuffer.allocate(messageWriter.size + MessageSet.LogOverhead)writeMessage(buffer, messageWriter, offset)//以最后一個(gè)Message的offset作為這個(gè)compressed Message的offset

Validate Message

什么需要驗(yàn)證?

先看一下消息的哪些特征需要被驗(yàn)證。

首先,網(wǎng)絡(luò)傳輸過程中,數(shù)據(jù)可能會(huì)產(chǎn)生錯(cuò)誤,即使是寫在磁盤上的消息,也可能會(huì)由于磁盤的問題產(chǎn)生錯(cuò)誤。因此,broker對(duì)接收到的消息需要驗(yàn)證其完整性。這里的消息就是前邊協(xié)議里定義的Message。對(duì)于消息完整性的檢測(cè),是使用CRC32校驗(yàn),但是并不是對(duì)消息的所有部分計(jì)算CRC,而是對(duì)Message的Crc部分以后的部分,不包括記錄的offset和MessageSize部分。把offset和MessageSize加到CRC計(jì)算中,可以對(duì)完整性有更強(qiáng)的估證,但是壞處在于這兩個(gè)部分在消息由producer到達(dá)broker以后,會(huì)被broker重寫,因此如果把它們計(jì)算在crc里邊,就需要在broker端重新計(jì)算crc32,這樣會(huì)帶來額外的開銷。

CRC32沒有檢測(cè)出錯(cuò)誤的概率在0.0047%以下,加上TCP本身也有校驗(yàn)機(jī)制,不能檢測(cè)出錯(cuò)誤的概率就很小了(這個(gè)還需要再仔細(xì)算一下)。

除了消息的完整性,還需要對(duì)消息的合規(guī)性進(jìn)行檢驗(yàn),主要是檢驗(yàn)offset是否是單調(diào)增長(zhǎng)的,以及MessageSize是超過了最大值。

這里檢驗(yàn)時(shí)使用的MessageSize就不是Message本身的大小了,而是一個(gè)記錄的大小,包括offset和MessageSize,這個(gè)也挺奇怪的,有必要非拉上這倆嗎?

而且在broker端檢驗(yàn)producer發(fā)來的MessageSet時(shí),也沒必要檢驗(yàn)它的offset是否是單調(diào)增長(zhǎng)的呀,畢竟leader還要對(duì)Message的offset重新賦值。而follower是從leader處拉取的,如果網(wǎng)絡(luò)或者磁盤出錯(cuò),通過對(duì)offset的單調(diào)性檢查也可能會(huì)漏掉出錯(cuò)了的記錄,對(duì)于consumer來說也是同理。所以這里有點(diǎn)奇怪。

何時(shí)需要驗(yàn)證?

在broker把收到的producer request里的MessageSet append到Log之前,以及consumer和follower獲取消息之后,都需要進(jìn)行校驗(yàn)。

這種情況分成兩種:

1. broker和consumer把收到的消息append到log之前

2. consumser收到消息后

第一種情況都是在調(diào)用Log#append時(shí)進(jìn)行檢驗(yàn)的。

如何驗(yàn)證?

先看下Log#append的方法聲明

def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo

在replica的fetcher線程調(diào)用append方法時(shí),會(huì)把a(bǔ)ssignOffsets設(shè)成false,而leader處理produce request時(shí),會(huì)把a(bǔ)ssignOffsets設(shè)成true。

下面append方法的一部分代碼

val appendInfo = analyzeAndValidateMessageSet(messages) //驗(yàn)證消息// if we have any valid messages, append them to the logif(appendInfo.shallowCount == 0)return appendInfo// trim any invalid bytes or partial messages before appending it to the on-disk logvar validMessages = trimInvalidBytes(messages, appendInfo)//trim掉不可用的部分或者殘缺的消息try {// they are valid, insert them in the loglock synchronized {appendInfo.firstOffset = nextOffsetMetadata.messageOffset if(assignOffsets) { //如果需要重新賦予offset// assign offsets to the message setval offset = new AtomicLong(nextOffsetMetadata.messageOffset)try {validMessages = validMessages.validateMessagesAndAssignOffsets(offset, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact) //驗(yàn)證消息并且賦予offset} catch {case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)}appendInfo.lastOffset = offset.get - 1} else {// we are taking the offsets we are givenif(!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)throw new IllegalArgumentException("Out of order offsets found in " + messages)}// re-validate message sizes since after re-compression some may exceed the limit 對(duì)壓縮后消息重新驗(yàn)證MessageSize是否超過了允許的最大值for(messageAndOffset <- validMessages.shallowIterator) {if(MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {// we record the original message set size instead of trimmed size// to be consistent with pre-compression bytesRejectedRate recording BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesRejectedRate.mark(messages.sizeInBytes)BrokerTopicStats.getBrokerAllTopicsStats.bytesRejectedRate.mark(messages.sizeInBytes)throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(messageAndOffset.message), config.maxMessageSize))}}

注意到對(duì)MessageSize驗(yàn)證了兩次,第二次是對(duì)重新壓縮后的消息。KAFKA-1718里提到MessageSizeToLargeException,就是在這時(shí)候檢測(cè)出來的。

初步檢驗(yàn):analyzeAndValidateMessageSet

具體的檢驗(yàn)消息完整性和offset單調(diào)增長(zhǎng)的邏輯在analyzeAndValidateMessageSet方法里。這個(gè)方法的實(shí)現(xiàn)里,需要注意幾點(diǎn):

  • 它是使用ByteBufferMessageSize的shallowIterator來對(duì)這個(gè)MessageSet的消息進(jìn)行迭代,這也意味著并不會(huì)對(duì)compressed message里邊的MessageSet解壓后再進(jìn)行檢驗(yàn),而是把comprssed message作為單個(gè)Message進(jìn)行檢驗(yàn)。
  • 它計(jì)算checksum時(shí),是計(jì)算的MagicByte及其以后的內(nèi)容。 def computeChecksum(): Long = CoreUtils.crc32(buffer.array, buffer.arrayOffset + MagicOffset, buffer.limit - MagicOffset)

    ?

  • 它比較的是entrySize與MaxMessageSize的大小,來確定這個(gè)消息是否太大 def entrySize(message: Message): Int = LogOverhead + message.size---------------------------------val MessageSizeLength = 4val OffsetLength = 8val LogOverhead = MessageSizeLength + OffsetLength

    ?

  • 它返回的LogAppendInfo中會(huì)包括一個(gè)targetCodec,指明這個(gè)MessageSet將要使用的壓縮方式。leader處理produce request時(shí),將使用這個(gè)壓縮方式重新壓縮整個(gè)MessageSet。 val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)

    config.compressionType就是broker配置里的compression.type的值,如果它是“producer", 就會(huì)使用producer request使用壓縮方式,否則就使用config.compressionType指明的壓縮方式。注意如果一個(gè)MessageSet里的Message采用了不同的壓縮方式,最后被當(dāng)成sourceCodec的是最后一個(gè)壓縮了的消息的壓縮方式。

  • 再次檢驗(yàn)并且賦予offset :validateMessagesAndAssignOffsets

    只有l(wèi)eader處理produce request時(shí),會(huì)調(diào)用ByteBufferMessageSet的這個(gè)方法。 它不會(huì)檢測(cè)analyzeAndValidateMessageSet已經(jīng)檢測(cè)的內(nèi)容,但是會(huì)把這個(gè)MessageSet進(jìn)行深度遍歷(即如果它里邊的消息是壓縮后,就把這個(gè)消息解壓開再遍歷),這樣它就能做analyzeAndValidateMessageSet不能進(jìn)行的檢測(cè):對(duì)于compacted topic檢測(cè)其key是否為空,如果為空就拋出InvalidMessageException。

    另外,它會(huì)把深度遍歷后獲得的Message放在一起重新壓縮。

    如果MessageSet的尾部不是完整的Message呢?

    這是在獲取ByteBufferMessageSet的iternalIterator時(shí)候處理的。

    def makeNextOuter: MessageAndOffset = {// if there isn't at least an offset and size, we are doneif (topIter.remaining < 12)return allDone()val offset = topIter.getLong()val size = topIter.getInt()if(size < Message.MinHeaderSize)throw new InvalidMessageException("Message found with corrupt size (" + size + ") in shallow iterator")// we have an incomplete messageif(topIter.remaining < size)return allDone(). ...}

    注意返回allDone()和拋出InvalidMessageException的時(shí)機(jī)。

    • 如果這個(gè)MessageSet剩下部分不到12bytes,那剩下的部分就是下一個(gè)MessageSet頭部的一部分,是沒法處理的,也是沒辦法檢驗(yàn)的,因此就返回allDone。
    • 如果夠12bytes,就可以讀出offset和MessageSize。MessageSize至少會(huì)大于Message頭里邊的那些crc、Attributes, MagicBytes等加起來的大小,因此如果MessageSize比這個(gè)還小,就肯定是個(gè)entry有問題,所以就拋出異常。這里的問題在于,即使MessageSet最后的那個(gè)Message是不完整的,只要MessageSize有問題,也會(huì)拋異常,而不是忽略這個(gè)不完整的Message。(這個(gè)可能是沒考慮到,也可能是有別的考慮,不過無論怎么處理最后的這個(gè)不完整的Message,都有一定的道理)。

    ?consumer端的驗(yàn)證

    consumer(0.9)會(huì)檢查checksum,不過是可以配置的,原因正如config里說的一樣。

    public static final String CHECK_CRCS_CONFIG = "check.crcs";private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";

    config的文檔說,檢查checksum是為了"ensures no on-the-wire or on-disk corruption to the message occurred."即,為了保證沒有在網(wǎng)絡(luò)傳輸出或者磁盤存儲(chǔ)時(shí)出現(xiàn)了消息的損壞。但是checksum計(jì)算時(shí)會(huì)帶來開銷,所以追求最佳性能,可以關(guān)掉checksum的檢查。


    ?

    下面來看一下幾個(gè)與消息格式相關(guān)的KIP。為什么需要這些改變呢?為什么之前沒有實(shí)現(xiàn)這些改變呢?都是因?yàn)楦鞣N折衷吧,需求與性能折衷,需求與實(shí)現(xiàn)所需的工作量的折衷……

    下面的幾個(gè)KIP可能會(huì)一起加上去,畢竟都是對(duì)消息格式的修改,不能搞沖突了。

    KIP-31 - Move to relative offsets in compressed message sets

    前邊提到了,在leader收到ProduceRequet之后,它會(huì)解壓開compressed message(也就是是這個(gè)KIP里的compressed messageset,這兩說說法的確有些亂),然后給里邊包含的message set的每條消息重新賦予offset。這個(gè)做法也是應(yīng)該的,乍一看也沒什么不好。但是問題在于,不僅是直接改個(gè)offset這么簡(jiǎn)單,在改完之后,需要重新壓縮這些消息,還要計(jì)算。這么一搞,開銷就大了。KIP-31就是想把這部分的性能損失降下來。(這個(gè)KIP已經(jīng)是accepted狀態(tài))

    做法是把在一個(gè)compressed message set里邊的每個(gè)message的offset里記下當(dāng)前message相對(duì)于外層的wrapper message的偏移。用漢語說這個(gè)意思比較費(fèi)勁,KIP里這么說

    When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.

    When broker receives a compressed message, it only needs to?

  • Decompress the message to verify the CRC and relative offset.
  • Set outer message's base offset. The outer message's base offset will be the offset of the last inner message.? (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)
  • 注意,這個(gè)wrapper message里記的base offset, 是它所含的message set里的最后一個(gè)message的offset。這個(gè)和當(dāng)前的compressed message的offset是一致的。

    然后當(dāng)broker收到一個(gè)壓縮后的消息時(shí),它只需要

    • 驗(yàn)證CRC與realtive offset的正確性
    • 重新設(shè)定外層消息的offset,也就是base offset。

    KIP-32 - Add timestamps to Kafka message

    在消息里加時(shí)間戳。需要注意的是,這個(gè)KIP還在討論中(以下的內(nèi)容是基于2016年1月7日的版本)。不像上一個(gè)已經(jīng)確定了。

    (俺是覺得這個(gè)事情早該做了……)

    首先,來看一下動(dòng)機(jī),這個(gè)提有意思

    Motivation

    This KIP tries to address the following issues in Kafka.

  • Log retention might not be honored:?Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  • Log rolling might break for a newly created replica as well because of the same reason as (1).
  • Some use cases such as streaming processing needs a timestamp in messages.
  • 說的是這幾個(gè)原因

    1. Log retention會(huì)不靠譜。當(dāng)前l(fā)og retention是在log segment層面做的,是按照log segment的最后修改時(shí)間確定是否要?jiǎng)h除一個(gè)log segment. 但是,當(dāng)replica重分配發(fā)生時(shí),新被分配的這個(gè)replica的log segment的修改時(shí)間會(huì)被設(shè)成當(dāng)前時(shí)間。這么一來,它就不能被按照log retention想要做的那樣(實(shí)際上是想把一段時(shí)間之前的消息刪除)被刪除。

    2. 由于和1同樣的原因,對(duì)于一個(gè)新創(chuàng)建的replica(意思應(yīng)該是移動(dòng)位置的replica, 并不是增加分區(qū)后新加的replica)log rolling有時(shí)候也會(huì)不靠譜。

    3. 有些場(chǎng)景中需要消息含有時(shí)間戳,比如流處理。

    感覺,貌似第三個(gè)原因才是決定性的,擁抱流處理。

    接口的變化

    準(zhǔn)備在Message里加入timestamp字段

    準(zhǔn)備增加兩個(gè)配置

    • message.timestamp.type 可以選CreateTime或者LogAppendTime,CreateTime就是這條消息生成的時(shí)間,是在producer端指定的。LogAppendTime就是append到log的時(shí)間(實(shí)現(xiàn)細(xì)節(jié)沒有說明)。
    • max.message.time.difference.ms 如果選擇了CreateTime, 那么只有當(dāng)createTime和broker的本地時(shí)間相差在這個(gè)配置指定的差距之內(nèi),broker才會(huì)接受這條消息。

    糾結(jié)之處

    之前關(guān)于這個(gè)KIP的討論主要是關(guān)于使用哪個(gè)時(shí)間, 是使用LogAppendTime(broker time),還是CreateTime(application time)。

    兩種都有利有弊:

    The good things about LogAppendTime are: 使用LogAppendTime的好處在于

  • Broker is more robust. Broker比起用戶程序更健壯(更不容易出錯(cuò),比如用戶程序可能有bug,導(dǎo)致CreateTime設(shè)置的不正確,想一想KIP-33,如果錯(cuò)得離譜,索引怎么建?)
  • Monotonically increasing. LogAppendTime是單調(diào)增長(zhǎng)的。(但是,follower收到的消息的timestamp該怎么設(shè)呢?如果不用leader帶來的,就不能確定是否monotonically increasing)
  • Deterministic behavior for log rolling and retention.log rolling和retention的行為是確定性的。(如果按消息里的這個(gè)timestamp來決定這兩個(gè)操作的行為,那么讓用戶指定timestamp的確挺危險(xiǎn)的)
  • If CreateTime is required, it can always be put into the message payload.如果需要CreateTime,可以加到消息的內(nèi)容里。(這個(gè)的確是……)
  • The good things about CreateTime are: 使用CreateTime的好處是

  • More intuitive to users. 更符合用戶的思維(用戶當(dāng)然是想使用自己填進(jìn)去的時(shí)間)。
  • User may want to have log retention based on when the message is created instead of when the message enters the pipeline.用戶可能更希望用消息被創(chuàng)建的時(shí)間來決定log retention的行為,而不是消息進(jìn)行處理管道的時(shí)間。
  • Immutable after entering the pipeline.這樣,消息的timestamp在進(jìn)入管道后就不會(huì)再改變了。
  • 在俺看來,這兩個(gè)選擇的確挺糾結(jié)的。用戶肯定是想用自己產(chǎn)生消息的時(shí)間,不然很難準(zhǔn)確地找到一條消息。但是,如果使用用戶指定的時(shí)間,broker端的行為就變得復(fù)雜了,比如,如果用戶指定的時(shí)間不是單調(diào)遞增的,該怎么建時(shí)間索引。但是用戶產(chǎn)生畸形的時(shí)間,倒可以通過配置里max.message.time.difference.ms來控制?;蛟S可以加另一個(gè)配置,允許broker在一定范圍內(nèi)修改CreateTime,比如最多可以更改1000ms。這樣就能即使消息的timestamp單調(diào)增長(zhǎng),也能使用戶對(duì)消息的時(shí)間的估計(jì)比較準(zhǔn)確。不過,這樣可能就需要讓broker time的含義變成broker收到消息時(shí)間,而不是append到log的時(shí)間。否則就難以確定何時(shí)該拒絕無法在指定范圍內(nèi)修改timestamp的消息。

    ?

    KIP-33 - Add a time based log index

    動(dòng)機(jī):

    當(dāng)前按照時(shí)間戳查找offset得到的結(jié)果是非常粗粒度的,只能在log segment的級(jí)別。(對(duì)于reassigned replica就差得沒譜了。)所以這個(gè)KIP提議建一個(gè)基于時(shí)間的對(duì)日志的索引,來允許按timestamp搜索消息的結(jié)果更準(zhǔn)確。

    這個(gè)KIP和KIP-32是緊密相關(guān)的。這倆KIP都在討論過程中。

    ?

    轉(zhuǎn)載于:https://www.cnblogs.com/devos/p/5100611.html

    總結(jié)

    以上是生活随笔為你收集整理的Kafka的消息格式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 日本护士╳╳╳hd少妇 | 美女又黄又免费的视频 | 日韩天堂网| 亚洲av无码一区二区三区在线播放 | 日日色综合 | 少妇户外露出[11p] | 欧美 亚洲 | 亚洲欧美日本一区二区三区 | 日本极品喷水 | 可以在线观看的黄色 | 美国黄色一级视频 | 在线观看不卡一区 | 亚洲一区在线电影 | 91玉足脚交嫩脚丫在线播放 | 日韩乱码一区二区 | 欧美日韩国产区 | 精品人伦一区二区三 | 驯服少爷漫画免费观看下拉式漫画 | 欧美一区亚洲二区 | 亚洲国产www | 亚洲av乱码一区二区 | 精品无码人妻少妇久久久久久 | 草草视频在线播放 | 亚洲欧美在线视频免费 | 性歌舞团一区二区三区视频 | 男女无遮挡网站 | 99久久精品免费看 | 国产色播av在线 | 日本a视频 | 粉嫩av一区 | 色婷婷97 | 美女网站免费观看视频 | 亚洲三级免费观看 | 樱花影院电视剧免费 | 国产视频日韩 | 亚洲精品一二三 | 潘金莲一级淫片aaaaa | 夫妻毛片 | 国产一区二区三区四区五区美女 | 日韩黄色网 | 97精品国产97久久久久久春色 | 久久噜噜噜精品国产亚洲综合 | 日本一区二区三区电影在线观看 | 精品网站999 | 久久久一本 | 男女黄床上色视频 | 男女做激情爱呻吟口述全过程 | 在线免费黄色 | 亚欧洲精品在线视频 | 成人黄色大片在线观看 | 台湾佬美性中文网 | 婷婷六月在线 | 久久福利精品 | 中文国产字幕 | 天天干视频| 蜜臀999| 日韩avav | 欧美伦理一区 | 日韩欧美一区二区视频 | 国产精品久久久国产盗摄 | 国产视频福利在线 | 国产一卡二卡三卡四卡 | 女同一区 | 国产一区二区三区视频在线 | 日本视频一区二区三区 | 人妻少妇精品视频一区二区三区 | 日韩在线天堂 | 影音先锋中文字幕人妻 | 狠狠做深爱婷婷综合一区 | 高清日韩 | 伊人久久久久噜噜噜亚洲熟女综合 | 日日躁夜夜躁狠狠久久av | 美女视频免费在线观看 | 亚洲一区二区三区四 | 国产剧情一区二区三区 | 午夜宅男网 | 少妇99 | 日本三级全黄 | 永久免费视频网站直接看 | 国产中文字幕一区二区 | 亚洲淫| 免费吸乳羞羞网站视频 | 亚洲在线观看免费视频 | 天堂色av| 国产97在线观看 | 国产精品日韩一区二区 | 无码人妻aⅴ一区二区三区有奶水 | 欧美日韩一区二区视频在线观看 | 仙踪林久久久久久久999 | 亚洲精品aⅴ中文字幕乱码 国产精品调教视频 | 欧美自拍偷拍第一页 | 亚洲男女在线观看 | 天天毛片| 人人爽人人香蕉 | 蜜乳av一区 | www久久久久| 成人国产在线视频 | 亚洲欧美日韩国产一区二区 | 密色av |