日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka Producer源码简述

發布時間:2023/12/20 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka Producer源码简述 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

  接著上文kafka的簡述,這一章我們一探kafka生產者是如何發送消息到消息服務器的。

?代碼的入口還是從

kafkaTemplate.send開始

?

最終我們就會到

org.springframework.kafka.core.KafkaTemplate#doSend方法

這里的關鍵就是

org.apache.kafka.clients.producer.Producer#send(org.apache.kafka.clients.producer.ProducerRecord<K,V>, org.apache.kafka.clients.producer.Callback)

我們再一路點擊下去,一直到

?org.apache.kafka.clients.producer.KafkaProducer#doSend方法

?

這里將步驟分為五步

1.更新Metadata,Metadata用于存儲部分topic數據 2.將發送內容序列化 3.如果我們有多個分區的話,在這里會根據算法選擇相應的分區 4.向accumulator寫入數據,accumulator是一種ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;結構,在這里對發送數據做零時緩存
5.緩存的夠多了,喚醒線程發送數據。

所以看到這里我們就明白了,kafka不是直接將數據發送到服務器。而是緩存到內存中,知道大于batchsize才去做發送

?

?

接下來我們看下sender線程做了什么

直接來到

org.apache.kafka.clients.producer.internals.Sender#run(long)

?

1.連接的獲取,

org.apache.kafka.clients.NetworkClient#initiateConnect

?

具體的connect代碼如下

首先與kafka serve端建立了一個non blocking 的SocketChannel,然后將該channel注冊到一個java.nio.channels.Selector上面,并注冊OP_CONNECT事件。

接下來,我們再看下消息的發送

首先調用

client.send(request, now);

這個方法最終會調用

org.apache.kafka.common.network.KafkaChannel#setSend

為每個request注冊

OP_WRITE事件

?同時把send傳遞進來

接下來調用?

this.client.poll(pollTimeout, now);

這個的調用鏈是

org.apache.kafka.common.network.Selector#poll---->?org.apache.kafka.common.network.Selector#pollSelectionKeys--->

這里的

key.isWritable()

就是我們上文注冊寫事件,當所有的都準備好了,我們調用channel將消息發送到服務端

?

?

到這里我們就知道了kafka發送消息的大致流程。本文并沒有對細節深入,只想對kafka做出快速的了解。




轉載于:https://www.cnblogs.com/xmzJava/p/9536351.html

總結

以上是生活随笔為你收集整理的Kafka Producer源码简述的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。