日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kafka_consumer_消费原理介绍

發(fā)布時(shí)間:2024/2/28 编程问答 39 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka_consumer_消费原理介绍 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

文章目錄

    • 簡(jiǎn)介
      • consumer使用樣例
      • consumer 如何和server通信,
      • consumer的offset管理
        • __consumer_offsets
      • consumer group的rebalance
        • 關(guān)于coordinator在join-group的等待時(shí)間

簡(jiǎn)介

本文根據(jù)kafka-1.0.0
??kafka的consumer相對(duì)來(lái)說(shuō)比producer復(fù)雜一些,因?yàn)樗麪砍兜綄?duì)offset的提交,consumer group的管理等操作。

consumer使用樣例

Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "test_group");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", 1000);props.put("session.timeout.ms", 120000);props.put("max.poll.interval.ms",600000);props.put("max.poll.records", 100);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);while (true) {ConsumerRecords<String, String> records = consumer.poll(timeout);print(records, table);

可以看到kafka的consumer使用起來(lái)也是很簡(jiǎn)單的,只能說(shuō)明包裝的好,里面也是包含了很多東西。

consumer 如何和server通信,

??consumer在調(diào)用上面的poll()操作的時(shí)候會(huì)觸發(fā)offset的提交動(dòng)作,也就是只有到下一次poll()的時(shí)候本次消費(fèi)的offset才會(huì)被提交(當(dāng)然,這是針對(duì)使用自動(dòng)提交的設(shè)置的時(shí)候),同時(shí)還會(huì)檢查是否需要rebalance等操作(這個(gè)等下面講)。
??當(dāng)然,consumer和server之間肯定要有一個(gè)心跳檢測(cè),來(lái)讓server知道consumer還是活著的,以免consumer被從group當(dāng)中踢出去。
??因?yàn)閏onsumer主線(xiàn)程在處理數(shù)據(jù)的時(shí)候后可能會(huì)消耗比較長(zhǎng)的時(shí)間,所以如果使用consumer主線(xiàn)程來(lái)完成心跳維持的話(huà)時(shí)間不容易控制時(shí)間,很有可能因?yàn)樘幚頂?shù)據(jù)消耗時(shí)間較長(zhǎng)而導(dǎo)致超時(shí)被server端誤認(rèn)為死掉了。
??因此,kafka在consumer端引入了一個(gè)heartbeat 線(xiàn)程來(lái)輔助維持consumer的心跳,這個(gè)線(xiàn)程只做心跳維護(hù)和一些狀態(tài)的同步,比如當(dāng)前group處于rebalance狀態(tài)heartbeat線(xiàn)程再發(fā)送心跳請(qǐng)求時(shí)會(huì)返回正在rebalance的狀態(tài),這個(gè)時(shí)候hearbeat線(xiàn)程會(huì)在consumer端和consumer共享的變量設(shè)置標(biāo)志位,標(biāo)識(shí)正在進(jìn)行rebalance,這樣的話(huà),consumer在進(jìn)行下一次poll()的時(shí)候會(huì)檢查這個(gè)標(biāo)志位,并作出一些動(dòng)作。

consumer的offset管理

??每個(gè)consumer group中的consumer的因?yàn)橐M(fèi)對(duì)應(yīng)的數(shù)據(jù)需要記錄offset,同時(shí)server端也要記錄這些offset以便于consumer端在短暫重啟以后還能保持繼續(xù)消費(fèi)。server端是如何記錄這些數(shù)據(jù)的呢,在server端會(huì)有一個(gè)叫 Coordinator的組件來(lái)負(fù)責(zé)管理consumer group

__consumer_offsets

__consumer_offsets是 Kafka 內(nèi)部使用的一個(gè) topic,專(zhuān)門(mén)用來(lái)存儲(chǔ) group 消費(fèi)的情況,默認(rèn)情況下有50個(gè) partition,每個(gè) partition 三副本。
??每個(gè)consumer group 的元數(shù)據(jù)存儲(chǔ)到那個(gè)partition有一些規(guī)則,通過(guò)這個(gè) abs(GroupId.hashCode()) % NumPartitions 來(lái)計(jì)算出一個(gè)值,(其中,NumPartitions 是 __consumer_offsets 的 partition 數(shù),默認(rèn)是50個(gè)),這個(gè)值代表了 __consumer_offsets 的一個(gè) partition,而這個(gè) partition 的 leader 即為這個(gè) Group 要交互的 GroupCoordinator 所在的節(jié)點(diǎn)。

??下面這個(gè)是test的一個(gè)配置,可以看到topic的ReplicationFactor 設(shè)置成了1,這就導(dǎo)致了一旦有一個(gè)broker掛了,如果這個(gè)broker對(duì)應(yīng)的partition對(duì)應(yīng)的存儲(chǔ)的有現(xiàn)有consuemr group的元數(shù)據(jù),那么對(duì)應(yīng)的offset元數(shù)據(jù)就丟了,也就沒(méi)有辦法正常消費(fèi)了。

[deploy@b-kk-search-03 kafka_2.11-1.0.0]$ bin/kafka-topics.sh --zookeeper 10.9.10.17:2182 --topic __consumer_offsets --describe Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producerTopic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 1 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 2 Leader: 1 Replicas: 1 Isr: 1Topic: __consumer_offsets Partition: 3 Leader: 0 Replicas: 0 Isr: 0Topic: __consumer_offsets Partition: 4 Leader: 1 Replicas: 1 Isr: 1

consumer group的rebalance

??針對(duì)一個(gè)consumer group ,kafka server端的coordinator會(huì)在有新的consuemr加入或者現(xiàn)有的consumer 退出的時(shí)候觸發(fā)rebalance,在rebalance階段,所有現(xiàn)有的consumer都要重新加入這個(gè)group。重新加入分為兩個(gè)階段,1.join-group 請(qǐng)求以加入 group,2.然后再發(fā)送 sync-group 請(qǐng)求以獲取被分配的partition等信息。

??coordinator端接收到第一個(gè)join-group請(qǐng)求以后,會(huì)進(jìn)入rebalance階段,然后等待所有的consuemr(指在此之前通過(guò)heartbeat監(jiān)測(cè)到的存活的)發(fā)起join-group,等待是有時(shí)間限制的,關(guān)于時(shí)間限制下面會(huì)單獨(dú)講一講,因?yàn)樗行┢婀帧5人械腸onsumer都發(fā)起了join-group以后,coordinator會(huì)從中選取一個(gè)consuemr作為leader來(lái)進(jìn)行分配(分配策略放在consumer端主要是為了將來(lái)拓展的時(shí)候更加靈活),coordinator把所有consumer的元數(shù)據(jù)信息都發(fā)送給這個(gè)leader,由他來(lái)判斷如何分配。當(dāng)leader將分配結(jié)果通過(guò)sync-group發(fā)送到coordiantor的時(shí)候,coordinator會(huì)在其他的consumer的sync-group中將分配結(jié)果下發(fā)。然后就完成了rebalance。

關(guān)于coordinator在join-group的等待時(shí)間

??在進(jìn)入rebalance階段的時(shí)候會(huì)等待所有之前存活的counsumer進(jìn)行join-group,這個(gè)等待時(shí)間是多少,在查看kafka官方文檔的時(shí)候有這樣的配置。

rebalance.timeout.ms The maximum allowed time for each worker to join the group once a rebalance has begun. This is basically a limit on the amount of time needed for all tasks to flush any pending dataand commit offsets. If the timeout is exceeded, then the worker will be removed from the group, which will cause offset commit failures. default 60000

這里覺(jué)得使用的是這個(gè)值,但是我們?cè)趯?shí)際使用中發(fā)現(xiàn)不太對(duì)。
rebalance的等待時(shí)間達(dá)到比60s要長(zhǎng),實(shí)際上和max.poll.interval.ms 時(shí)間一致,后來(lái)翻看了不少源碼才發(fā)現(xiàn)使用的就是 max.poll.interval.ms,估計(jì)rebalance.timeout.ms暫時(shí)還沒(méi)有用到。

總結(jié)

以上是生活随笔為你收集整理的kafka_consumer_消费原理介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。