Kafka Producer 发送消息源码阅读
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
今天看了kafka 發(fā)送消息部分的源碼(0.8.2.1版本的),針對(duì)kafka的消息發(fā)送,分區(qū)策略如下:
1 kafka的分區(qū)策略
?1.1 如果指定了partition,則將消息發(fā)到對(duì)應(yīng)的partition
?1.2 如果沒(méi)有指定partition,但指定了key, 會(huì)根據(jù)key的hash選擇一個(gè)partition, ?
? ?如果如果key名固定,則消息只會(huì)發(fā)到固定的一個(gè)partition上, 所以key不要設(shè)置為固定的值,如果需要設(shè)置,則需要考慮修改kafka的源碼,以支持將數(shù)據(jù)均勻發(fā)到不同的partition上
1.3 如果key,partition都沒(méi)有指定,則采用round-robin即輪循的方式發(fā)到每個(gè)partition
2 消息的發(fā)送都是異步的,發(fā)送過(guò)程如下
涉及到三個(gè)對(duì)象:
2.1?RecordAccumulator
維護(hù)了一個(gè)ConcurrentMap<TopicPartition,?Deque<RecordBatch>>?batches 對(duì)象
一個(gè)partition對(duì)應(yīng)一個(gè)RecordBatch的ArrayDeque? ?
調(diào)用KafkaProducer.send方法發(fā)送消息,最終調(diào)用如下方法:
? ? ? ? ? ?
如果RecordBatch已經(jīng)滿 或 創(chuàng)建了新的RecordBatch,則喚醒發(fā)送對(duì)象Sender
???? ? ? ? ? ? ??
2.2?Sender
?The?background?thread?that?handles?the?sending?of?produce?requests?to?the?Kafka?cluster
Sender通過(guò)kafkaclient將RecordAccumulator 的數(shù)據(jù)批量寫入到server? ??
Sender定義的run方法實(shí)現(xiàn)如下:
? ?
在run(long now)中,實(shí)現(xiàn)邏輯如下:
2.2.1 首先通過(guò)如下條件獲取發(fā)送數(shù)據(jù)的節(jié)點(diǎn)?
2.2.2刪除掉當(dāng)前不能發(fā)送的kafka node
? ??? ??? ??? ?? ? ? ? ? ?
2.2.3 獲取發(fā)送的數(shù)據(jù)列表
? ? 循環(huán)此節(jié)點(diǎn)上是leader的partition
? ? ? ? ? 根據(jù)partition,獲取此partition對(duì)應(yīng)的RecordBatch,并放到此節(jié)點(diǎn)對(duì)應(yīng)的?List<RecordBatch>
? ??? ??? ??? ??? ??? ??? ???? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?
2.2.4組裝請(qǐng)求對(duì)象,發(fā)送到不同的kafka節(jié)點(diǎn)
計(jì)算pollTimeout并發(fā)送請(qǐng)求對(duì)象到不同的kafka節(jié)點(diǎn)
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes// with sendable data that aren't ready to send since they would cause busy looping.long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);if (result.readyNodes.size() > 0) {log.trace("Nodes with data ready to send: {}", result.readyNodes);log.trace("Created {} produce requests: {}", requests.size(), requests);pollTimeout = 0;}// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);2.2.5 針對(duì)返回的數(shù)據(jù)進(jìn)行處理
// if some partitions are already ready to be sent, the select time would be 0;// otherwise if some partition already has some data accumulated but not ready yet,// the select time will be the time difference between now and its linger expiry time;// otherwise the select time will be the time difference between now and the metadata expiry time;List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now);for (ClientResponse response : responses) {if (response.wasDisconnected())handleDisconnect(response, now);elsehandleResponse(response, now);}2.3?KafkaClient
其實(shí)現(xiàn)類是:NetworkClient,基于socket方式與server進(jìn)行數(shù)據(jù)交互
3 kafka參數(shù)配置
用于存儲(chǔ)批量數(shù)據(jù)的緩沖大小(對(duì)應(yīng)類:MemoryRecords)?batch-size :?16384
用于整個(gè)client緩存所有發(fā)送對(duì)象的大小(對(duì)應(yīng)類:BufferPool ) :BUFFER_MEMORY ?32?*?1024?*?1024L 即 32M
用于發(fā)送延遲的時(shí)間配置(LINGER_MS),如果設(shè)置為1秒,則記錄先發(fā)送到client緩存中,等待1秒后再發(fā)送數(shù)據(jù),默認(rèn)為0 表示立即發(fā)送
指定數(shù)據(jù)壓縮類型:?compression.type ,支持:none,gzip, snappy, lz4, 默認(rèn)為none
理論上,設(shè)置LINGER_MS 會(huì)提高消息的吞吐量
轉(zhuǎn)載于:https://my.oschina.net/cloudcoder/blog/917309
總結(jié)
以上是生活随笔為你收集整理的Kafka Producer 发送消息源码阅读的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 介绍两款API管理工具
- 下一篇: cmder 基本配置和使用