日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

apache kafka源码分析-Producer分析---转载

發(fā)布時間:2025/4/5 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 apache kafka源码分析-Producer分析---转载 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原文地址:http://www.aboutyun.com/thread-9938-1-1.html

問題導(dǎo)讀
1.Kafka提供了Producer類作為java producer的api,此類有幾種發(fā)送方式?
2.總結(jié)調(diào)用producer.send方法包含哪些流程?
3.Producer難以理解的在什么地方?

producer的發(fā)送方式剖析
Kafka提供了Producer類作為java producer的api,該類有sync和async兩種發(fā)送方式。
sync架構(gòu)圖


async架構(gòu)圖


調(diào)用流程如下:


代碼流程如下:
Producer:當(dāng)new Producer(new ProducerConfig()),其底層實現(xiàn),實際會產(chǎn)生兩個核心類的實例:Producer、DefaultEventHandler。在創(chuàng)建的同時,會默認(rèn)new一個ProducerPool,即我們每new一個java的Producer類,就會有創(chuàng)建Producer、EventHandler和ProducerPool,ProducerPool為連接不同kafka broker的池,初始連接個數(shù)有broker.list參數(shù)決定。
調(diào)用producer.send方法流程:
當(dāng)應(yīng)用程序調(diào)用producer.send方法時,其內(nèi)部其實調(diào)的是eventhandler.handle(message)方法,eventHandler會首先序列化該消息,
eventHandler.serialize(events)-->dispatchSerializedData()-->partitionAndCollate()-->send()-->SyncProducer.send()
調(diào)用邏輯解釋:當(dāng)客戶端應(yīng)用程序調(diào)用producer發(fā)送消息messages時(既可以發(fā)送單條消息,也可以發(fā)送List多條消息),調(diào)用eventhandler.serialize首先序列化所有消息,序列化操作用戶可以自定義實現(xiàn)Encoder接口,下一步調(diào)用partitionAndCollate根據(jù)topics的messages進(jìn)行分組操作,messages分配給dataPerBroker(多個不同的Broker的Map),根據(jù)不同Broker調(diào)用不同的SyncProducer.send批量發(fā)送消息數(shù)據(jù),SyncProducer包裝了nio網(wǎng)絡(luò)操作信息。
Producer的sync與async發(fā)送消息處理,大家看以上架構(gòu)圖一目了然。
partitionAndCollate方法詳細(xì)作用:獲取所有partitions的leader所在leaderBrokerId(就是在該partiionid的leader分布在哪個broker上),
創(chuàng)建一個HashMap>>>,把messages按照brokerId分組組裝數(shù)據(jù),然后為SyncProducer分別發(fā)送消息作準(zhǔn)備工作。

名稱解釋:partKey:分區(qū)關(guān)鍵字,當(dāng)客戶端應(yīng)用程序?qū)崿F(xiàn)Partitioner接口時,傳入?yún)?shù)key為分區(qū)關(guān)鍵字,根據(jù)key和numPartitions,返回分區(qū)(partitions)索引。記住partitions分區(qū)索引是從0開始的。

Producer平滑擴(kuò)容機(jī)制
如果開發(fā)過producer客戶端代碼,會知道m(xù)etadata.broker.list參數(shù),它的含義是kafak broker的ip和port列表,producer初始化時,就連接這幾個broker,這時大家會有疑問,producer支持kafka cluster新增broker節(jié)點?它又沒有監(jiān)聽zk broker節(jié)點或從zk中獲取broker信息,答案是肯定的,producer可以支持平滑擴(kuò)容broker,他是通過定時與現(xiàn)有的metadata.broker.list通信,獲取新增broker信息,然后把新建的SyncProducer放入ProducerPool中。等待后續(xù)應(yīng)用程序調(diào)用。

DefaultEventHandler類中初始化實例化BrokerPartitionInfo類,然后定期brokerPartitionInfo.updateInfo方法,DefaultEventHandler部分代碼如下:def handle(events: Seq[KeyedMessage[K,V]]) {......while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)if (topicMetadataRefreshInterval >= 0 &&SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))sendPartitionPerTopicCache.clear()topicMetadataToRefresh.clearlastTopicMetadataRefreshTime = SystemTime.milliseconds}outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)if (outstandingProduceRequests.size > 0) {info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))//休眠時間,多長時間刷新一次 Thread.sleep(config.retryBackoffMs)// 生產(chǎn)者定期請求刷新最新topics的broker元數(shù)據(jù)信息 Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement)).....}}}

BrokerPartitionInfo的updateInfo方法代碼如下:

def updateInfo(topics: Set[String], correlationId: Int) {var topicsMetadata: Seq[TopicMetadata] = Nil//根據(jù)topics列表,meta.broker.list,其他配置參數(shù),correlationId表示請求次數(shù),一個計數(shù)器參數(shù)而已//創(chuàng)建一個topicMetadataRequest,并隨機(jī)的選取傳入的broker信息中任何一個去取metadata,直到取到為止val topicMetadataResponse = ClientUtils.fetchTopicMetadata(topics, brokers, producerConfig, correlationId)topicsMetadata = topicMetadataResponse.topicsMetadata// throw partition specific exceptiontopicsMetadata.foreach(tmd =>{trace("Metadata for topic %s is %s".format(tmd.topic, tmd))if(tmd.errorCode == ErrorMapping.NoError) {topicPartitionInfo.put(tmd.topic, tmd)} elsewarn("Error while fetching metadata [%s] for topic [%s]: %s ".format(tmd, tmd.topic, ErrorMapping.exceptionFor(tmd.errorCode).getClass))tmd.partitionsMetadata.foreach(pmd =>{if (pmd.errorCode != ErrorMapping.NoError && pmd.errorCode == ErrorMapping.LeaderNotAvailableCode) {warn("Error while fetching metadata %s for topic partition [%s,%d]: [%s]".format(pmd, tmd.topic, pmd.partitionId,ErrorMapping.exceptionFor(pmd.errorCode).getClass))} // any other error code (e.g. ReplicaNotAvailable) can be ignored since the producer does not need to access the replica and isr metadata })})producerPool.updateProducer(topicsMetadata)}

ClientUtils.fetchTopicMetadata方法代碼:

def fetchTopicMetadata(topics: Set[String], brokers: Seq[Broker], producerConfig: ProducerConfig, correlationId: Int): TopicMetadataResponse = {var fetchMetaDataSucceeded: Boolean = falsevar i: Int = 0val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion, correlationId, producerConfig.clientId, topics.toSeq)var topicMetadataResponse: TopicMetadataResponse = nullvar t: Throwable = nullval shuffledBrokers = Random.shuffle(brokers) //生成隨機(jī)數(shù)while(i ProducerPool的updateProducer def updateProducer(topicMetadata: Seq[TopicMetadata]) {val newBrokers = new collection.mutable.HashSet[Broker]topicMetadata.foreach(tmd => {tmd.partitionsMetadata.foreach(pmd => {if(pmd.leader.isDefined)newBrokers+=(pmd.leader.get)})})lock synchronized {newBrokers.foreach(b => {if(syncProducers.contains(b.id)){syncProducers(b.id).close()syncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))} elsesyncProducers.put(b.id, ProducerPool.createSyncProducer(config, b))})}}

當(dāng)我們啟動kafka broker后,并且大量producer和consumer時,經(jīng)常會報如下異常信息。

  • root@lizhitao:/opt/soft$ Closing socket connection to 192.168.11.166
  • 復(fù)制代碼




    筆者也是經(jīng)常很長時間看源碼分析,才明白了為什么ProducerConfig配置信息里面并不要求使用者提供完整的kafka集群的broker信息,而是任選一個或幾個即可。因為他會通過您選擇的broker和topics信息而獲取最新的所有的broker信息。
    值得了解的是用于發(fā)送TopicMetadataRequest的SyncProducer雖然是用ProducerPool.createSyncProducer方法建出來的,但用完并不還回ProducerPool,而是直接Close.


    重難點理解:
    刷新metadata并不僅在第一次初始化時做。為了能適應(yīng)kafka broker運(yùn)行中因為各種原因掛掉、paritition改變等變化,
    eventHandler會定期的再去刷新一次該metadata,刷新的間隔用參數(shù)topic.metadata.refresh.interval.ms定義,默認(rèn)值是10分鐘。
    這里有三點需要強(qiáng)調(diào):

    客戶端調(diào)用send, 才會新建SyncProducer,只有調(diào)用send才會去定期刷新metadata在每次取metadata時,kafka會新建一個SyncProducer去取metadata,邏輯處理完后再close。根據(jù)當(dāng)前SyncProducer(一個Broker的連接)取得的最新的完整的metadata,刷新ProducerPool中到broker的連接.每10分鐘的刷新會直接重新把到每個broker的socket連接重建,意味著在這之后的第一個請求會有幾百毫秒的延遲。如果不想要該延遲,把topic.metadata.refresh.interval.ms值改為-1,這樣只有在發(fā)送失敗時,才會重新刷新。Kafka的集群中如果某個partition所在的broker掛了,可以檢查錯誤后重啟重新加入集群,手動做rebalance,producer的連接會再次斷掉,直到rebalance完成,那么刷新后取到的連接著中就會有這個新加入的broker。


    說明:每個SyncProducer實例化對象會建立一個socket連接


    特別注意:
    在ClientUtils.fetchTopicMetadata調(diào)用完成后,回到BrokerPartitionInfo.updateInfo繼續(xù)執(zhí)行,在其末尾,pool會根據(jù)上面取得的最新的metadata建立所有的SyncProducer,即Socket通道producerPool.updateProducer(topicsMetadata)

    在ProducerPool中,SyncProducer的數(shù)目是由該topic的partition數(shù)目控制的,即每一個SyncProducer對應(yīng)一個broker,內(nèi)部封了一個到該broker的socket連接。每次刷新時,會把已存在SyncProducer給close掉,即關(guān)閉socket連接,然后新建SyncProducer,即新建socket連接,去覆蓋老的。
    如果不存在,則直接創(chuàng)建新的。

    ?

    轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4182001.html

    總結(jié)

    以上是生活随笔為你收集整理的apache kafka源码分析-Producer分析---转载的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。