kafka_consumer_消费原理介绍
文章目錄
- 簡(jiǎn)介
- consumer使用樣例
- consumer 如何和server通信,
- consumer的offset管理
- __consumer_offsets
- consumer group的rebalance
- 關(guān)于coordinator在join-group的等待時(shí)間
簡(jiǎn)介
本文根據(jù)kafka-1.0.0
??kafka的consumer相對(duì)來(lái)說(shuō)比producer復(fù)雜一些,因?yàn)樗麪砍兜綄?duì)offset的提交,consumer group的管理等操作。
consumer使用樣例
Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test_group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", 1000);props.put("session.timeout.ms", 120000);props.put("max.poll.interval.ms",600000);props.put("max.poll.records", 100);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);while (true) {ConsumerRecords<String, String> records = consumer.poll(timeout);print(records, table);可以看到kafka的consumer使用起來(lái)也是很簡(jiǎn)單的,只能說(shuō)明包裝的好,里面也是包含了很多東西。
consumer 如何和server通信,
??consumer在調(diào)用上面的poll()操作的時(shí)候會(huì)觸發(fā)offset的提交動(dòng)作,也就是只有到下一次poll()的時(shí)候本次消費(fèi)的offset才會(huì)被提交(當(dāng)然,這是針對(duì)使用自動(dòng)提交的設(shè)置的時(shí)候),同時(shí)還會(huì)檢查是否需要rebalance等操作(這個(gè)等下面講)。
??當(dāng)然,consumer和server之間肯定要有一個(gè)心跳檢測(cè),來(lái)讓server知道consumer還是活著的,以免consumer被從group當(dāng)中踢出去。
??因?yàn)閏onsumer主線(xiàn)程在處理數(shù)據(jù)的時(shí)候后可能會(huì)消耗比較長(zhǎng)的時(shí)間,所以如果使用consumer主線(xiàn)程來(lái)完成心跳維持的話(huà)時(shí)間不容易控制時(shí)間,很有可能因?yàn)樘幚頂?shù)據(jù)消耗時(shí)間較長(zhǎng)而導(dǎo)致超時(shí)被server端誤認(rèn)為死掉了。
??因此,kafka在consumer端引入了一個(gè)heartbeat 線(xiàn)程來(lái)輔助維持consumer的心跳,這個(gè)線(xiàn)程只做心跳維護(hù)和一些狀態(tài)的同步,比如當(dāng)前group處于rebalance狀態(tài)heartbeat線(xiàn)程再發(fā)送心跳請(qǐng)求時(shí)會(huì)返回正在rebalance的狀態(tài),這個(gè)時(shí)候hearbeat線(xiàn)程會(huì)在consumer端和consumer共享的變量設(shè)置標(biāo)志位,標(biāo)識(shí)正在進(jìn)行rebalance,這樣的話(huà),consumer在進(jìn)行下一次poll()的時(shí)候會(huì)檢查這個(gè)標(biāo)志位,并作出一些動(dòng)作。
consumer的offset管理
??每個(gè)consumer group中的consumer的因?yàn)橐M(fèi)對(duì)應(yīng)的數(shù)據(jù)需要記錄offset,同時(shí)server端也要記錄這些offset以便于consumer端在短暫重啟以后還能保持繼續(xù)消費(fèi)。server端是如何記錄這些數(shù)據(jù)的呢,在server端會(huì)有一個(gè)叫 Coordinator的組件來(lái)負(fù)責(zé)管理consumer group
__consumer_offsets
__consumer_offsets是 Kafka 內(nèi)部使用的一個(gè) topic,專(zhuān)門(mén)用來(lái)存儲(chǔ) group 消費(fèi)的情況,默認(rèn)情況下有50個(gè) partition,每個(gè) partition 三副本。
??每個(gè)consumer group 的元數(shù)據(jù)存儲(chǔ)到那個(gè)partition有一些規(guī)則,通過(guò)這個(gè) abs(GroupId.hashCode()) % NumPartitions 來(lái)計(jì)算出一個(gè)值,(其中,NumPartitions 是 __consumer_offsets 的 partition 數(shù),默認(rèn)是50個(gè)),這個(gè)值代表了 __consumer_offsets 的一個(gè) partition,而這個(gè) partition 的 leader 即為這個(gè) Group 要交互的 GroupCoordinator 所在的節(jié)點(diǎn)。
??下面這個(gè)是test的一個(gè)配置,可以看到topic的ReplicationFactor 設(shè)置成了1,這就導(dǎo)致了一旦有一個(gè)broker掛了,如果這個(gè)broker對(duì)應(yīng)的partition對(duì)應(yīng)的存儲(chǔ)的有現(xiàn)有consuemr group的元數(shù)據(jù),那么對(duì)應(yīng)的offset元數(shù)據(jù)就丟了,也就沒(méi)有辦法正常消費(fèi)了。
[deploy@b-kk-search-03 kafka_2.11-1.0.0]$ bin/kafka-topics.sh --zookeeper 10.9.10.17:2182 --topic __consumer_offsets --describe Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producerTopic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 2 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 3 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 4 Leader: 1 Replicas: 1 Isr: 1consumer group的rebalance
??針對(duì)一個(gè)consumer group ,kafka server端的coordinator會(huì)在有新的consuemr加入或者現(xiàn)有的consumer 退出的時(shí)候觸發(fā)rebalance,在rebalance階段,所有現(xiàn)有的consumer都要重新加入這個(gè)group。重新加入分為兩個(gè)階段,1.join-group 請(qǐng)求以加入 group,2.然后再發(fā)送 sync-group 請(qǐng)求以獲取被分配的partition等信息。
??coordinator端接收到第一個(gè)join-group請(qǐng)求以后,會(huì)進(jìn)入rebalance階段,然后等待所有的consuemr(指在此之前通過(guò)heartbeat監(jiān)測(cè)到的存活的)發(fā)起join-group,等待是有時(shí)間限制的,關(guān)于時(shí)間限制下面會(huì)單獨(dú)講一講,因?yàn)樗行┢婀帧5人械腸onsumer都發(fā)起了join-group以后,coordinator會(huì)從中選取一個(gè)consuemr作為leader來(lái)進(jìn)行分配(分配策略放在consumer端主要是為了將來(lái)拓展的時(shí)候更加靈活),coordinator把所有consumer的元數(shù)據(jù)信息都發(fā)送給這個(gè)leader,由他來(lái)判斷如何分配。當(dāng)leader將分配結(jié)果通過(guò)sync-group發(fā)送到coordiantor的時(shí)候,coordinator會(huì)在其他的consumer的sync-group中將分配結(jié)果下發(fā)。然后就完成了rebalance。
關(guān)于coordinator在join-group的等待時(shí)間
??在進(jìn)入rebalance階段的時(shí)候會(huì)等待所有之前存活的counsumer進(jìn)行join-group,這個(gè)等待時(shí)間是多少,在查看kafka官方文檔的時(shí)候有這樣的配置。
rebalance.timeout.ms The maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending dataand commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures. default 60000這里覺(jué)得使用的是這個(gè)值,但是我們?cè)趯?shí)際使用中發(fā)現(xiàn)不太對(duì)。
rebalance的等待時(shí)間達(dá)到比60s要長(zhǎng),實(shí)際上和max.poll.interval.ms 時(shí)間一致,后來(lái)翻看了不少源碼才發(fā)現(xiàn)使用的就是 max.poll.interval.ms,估計(jì)rebalance.timeout.ms暫時(shí)還沒(méi)有用到。
總結(jié)
以上是生活随笔為你收集整理的kafka_consumer_消费原理介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Linux上搭建h2引擎加载h2文件
- 下一篇: kafka_rebalance过长问题排