Apache Kafka源码剖析:第5篇 业务API处理
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
之前說過了,請求到達(dá)業(yè)務(wù)線程池后,會(huì)被處理,但是如何被處理呢?這就是接下來要說的。
-----------------------------------------------------------------------------------------------
業(yè)務(wù)線程屬于 Kafka的API層,對請求的處理通過調(diào)用KafkaAPIs中的方法實(shí)現(xiàn)!
1 KafkaRequestHandler
首先我們得知道這個(gè)業(yè)務(wù)線程池是怎么創(chuàng)建的
回到KafkaServer.scala
requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,config.numIoThreads) class KafkaRequestHandlerPool(val brokerId: Int,val requestChannel: RequestChannel,val apis: KafkaApis,time: Time,numThreads: Int) extends Logging with KafkaMetricsGroup {/* a meter to track the average free capacity of the request handlers */private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "val runnables = new Array[KafkaRequestHandler](numThreads)for(i <- 0 until numThreads) {//創(chuàng)建這么多個(gè)runnable,放到線程里執(zhí)行runnables(i) = new KafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis, time)Utils.daemonThread("kafka-request-handler-" + i, runnables(i)).start()}取出請求執(zhí)行的代碼
def run() {while (true) {try {var req : RequestChannel.Request = nullwhile (req == null) {// We use a single meter for aggregate idle percentage for the thread pool.// Since meter is calculated as total_recorded_value / time_window and// time_window is independent of the number of threads, each recorded idle// time should be discounted by # threads.val startSelectTime = time.nanosecondsreq = requestChannel.receiveRequest(300)val endTime = time.nanosecondsif (req != null)req.requestDequeueTimeNanos = endTimeval idleTime = endTime - startSelectTimeaggregateIdleMeter.mark(idleTime / totalHandlerThreads)}if (req eq RequestChannel.AllDone) {debug("Kafka request handler %d on broker %d received shut down command".format(id, brokerId))latch.countDown()return}trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))apis.handle(req)} catch {case e: FatalExitError =>latch.countDown()Exit.exit(e.statusCode)case e: Throwable => error("Exception when handling request", e)}}}這樣就不難理解了吧,
可見,API層使用kafkaRequestHandlerPool來管理所有的KafkaRequestHandler線程,它是1個(gè)簡易版的線程池,其中創(chuàng)建了多個(gè)KafkaRequestHandler線程。
?
KafkaApis
是Kafka服務(wù)器處理請求的入口類,負(fù)責(zé)將KafkaRequestHandler.Request分發(fā)到不同的handle*()方法里執(zhí)行,見圖:
因?yàn)楹瘮?shù)太多,這里就不展開,后面碰到的時(shí)候再詳細(xì)展開!
?
轉(zhuǎn)載于:https://my.oschina.net/qiangzigege/blog/1507362
總結(jié)
以上是生活随笔為你收集整理的Apache Kafka源码剖析:第5篇 业务API处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2017年本博客知识体系引导(更新至20
- 下一篇: hdu 6034 B - Balala