日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Apache Kafka源码剖析:第5篇 业务API处理

發(fā)布時(shí)間:2025/3/21 编程问答 9 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Apache Kafka源码剖析:第5篇 业务API处理 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>

之前說過了,請求到達(dá)業(yè)務(wù)線程池后,會(huì)被處理,但是如何被處理呢?這就是接下來要說的。

-----------------------------------------------------------------------------------------------

業(yè)務(wù)線程屬于 Kafka的API層,對請求的處理通過調(diào)用KafkaAPIs中的方法實(shí)現(xiàn)!

1 KafkaRequestHandler

首先我們得知道這個(gè)業(yè)務(wù)線程池是怎么創(chuàng)建的

回到KafkaServer.scala

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,config.numIoThreads) class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,time: Time,numThreads: Int) extends Logging with KafkaMetricsGroup {/* a meter to track the average free capacity of the request handlers */private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "val runnables = new Array[KafkaRequestHandler](numThreads)for(i <- 0 until numThreads) {//創(chuàng)建這么多個(gè)runnable,放到線程里執(zhí)行runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()}

取出請求執(zhí)行的代碼

def run() {while (true) {try {var req : RequestChannel.Request = nullwhile (req == null) {// We use a single meter for aggregate idle percentage for the thread pool.// Since meter is calculated as total_recorded_value / time_window and// time_window is independent of the number of threads, each recorded idle// time should be discounted by # threads.val startSelectTime = time.nanosecondsreq = requestChannel.receiveRequest(300)val endTime = time.nanosecondsif (req != null)req.requestDequeueTimeNanos = endTimeval idleTime = endTime - startSelectTimeaggregateIdleMeter.mark(idleTime / totalHandlerThreads)}if (req eq RequestChannel.AllDone) {debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))latch.countDown()return}trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))apis.handle(req)} catch {case e: FatalExitError =>latch.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)}}}

這樣就不難理解了吧,

可見,API層使用kafkaRequestHandlerPool來管理所有的KafkaRequestHandler線程,它是1個(gè)簡易版的線程池,其中創(chuàng)建了多個(gè)KafkaRequestHandler線程。

?

KafkaApis

是Kafka服務(wù)器處理請求的入口類,負(fù)責(zé)將KafkaRequestHandler.Request分發(fā)到不同的handle*()方法里執(zhí)行,見圖:

因?yàn)楹瘮?shù)太多,這里就不展開,后面碰到的時(shí)候再詳細(xì)展開!

?

轉(zhuǎn)載于:https://my.oschina.net/qiangzigege/blog/1507362

總結(jié)

以上是生活随笔為你收集整理的Apache Kafka源码剖析:第5篇 业务API处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。