Kafka Producer源码简述
生活随笔
收集整理的這篇文章主要介紹了
Kafka Producer源码简述
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
接著上文kafka的簡述,這一章我們一探kafka生產者是如何發送消息到消息服務器的。
?代碼的入口還是從
kafkaTemplate.send開始?
最終我們就會到
org.springframework.kafka.core.KafkaTemplate#doSend方法這里的關鍵就是
org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback)我們再一路點擊下去,一直到
?org.apache.kafka.clients.producer.KafkaProducer#doSend方法?
這里將步驟分為五步
1.更新Metadata,Metadata用于存儲部分topic數據 2.將發送內容序列化 3.如果我們有多個分區的話,在這里會根據算法選擇相應的分區 4.向accumulator寫入數據,accumulator是一種ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;結構,在這里對發送數據做零時緩存5.緩存的夠多了,喚醒線程發送數據。
所以看到這里我們就明白了,kafka不是直接將數據發送到服務器。而是緩存到內存中,知道大于batchsize才去做發送
?
?
接下來我們看下sender線程做了什么
直接來到
org.apache.kafka.clients.producer.internals.Sender#run(long)?
1.連接的獲取,
org.apache.kafka.clients.NetworkClient#initiateConnect?
具體的connect代碼如下
首先與kafka serve端建立了一個non blocking 的SocketChannel,然后將該channel注冊到一個java.nio.channels.Selector上面,并注冊OP_CONNECT事件。
接下來,我們再看下消息的發送
首先調用
client.send(request, now);這個方法最終會調用
org.apache.kafka.common.network.KafkaChannel#setSend為每個request注冊
OP_WRITE事件?同時把send傳遞進來
接下來調用?
this.client.poll(pollTimeout, now);這個的調用鏈是
org.apache.kafka.common.network.Selector#poll---->?org.apache.kafka.common.network.Selector#pollSelectionKeys--->這里的
key.isWritable()就是我們上文注冊寫事件,當所有的都準備好了,我們調用channel將消息發送到服務端
?
?
到這里我們就知道了kafka發送消息的大致流程。本文并沒有對細節深入,只想對kafka做出快速的了解。
轉載于:https://www.cnblogs.com/xmzJava/p/9536351.html
總結
以上是生活随笔為你收集整理的Kafka Producer源码简述的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Axure RP8下载以及注册
- 下一篇: 织梦文章添加字段填栏目id,内容页调用字