Kafka Shell Lag
Kafka Shell Lag
kafka 版本:2.1.0
前言
在生產(chǎn)環(huán)境中,比如你正在使用group kafka-lag消費(fèi)某topic內(nèi)的數(shù)據(jù)。目前你沒(méi)有搭建對(duì)應(yīng)的監(jiān)控系統(tǒng),你如何去查看對(duì)應(yīng)partition 的堆積信息呢?很多人都會(huì)去使用這個(gè)命令:
# 正常使用 kafka-consumer-groups --bootstrap-server master:9092 --describe --group default# 系統(tǒng)存在kerberos認(rèn)證使用 kafka-consumer-groups --bootstrap-server master:9092 --describe --group default --command-config /home/xiahu/client.propertiesclient.properties
security.protocol=PLAINTEXT sasl.mechanism=GSSAPI sasl.kerberos.service.name=kafka沒(méi)錯(cuò),今天我們就來(lái)研究一下這個(gè)命令,先從kafka-consumer-groups啟動(dòng)腳本看起
1. kafka-consumer-groups.sh
# 該腳本只是簡(jiǎn)單的去調(diào)用了另外一個(gè)腳本kafka-run-class.sh,并將參數(shù)傳遞過(guò)去 exec $(dirname $0)/kafka-run-class.sh kafka.admin.ConsumerGroupCommand "$@"2. kafka-run-class.sh
# 這個(gè)腳本內(nèi)的內(nèi)容太多了,其他的我也沒(méi)看,但是你所需要明白的是: # 在命令行執(zhí)行: kafka-consumer-groups --bootstrap-server master:9092 --describe --group default # 最終調(diào)用:kafka.admin.ConsumerGroupCommand --bootstrap-server master:9092 --describe --group default # 所以主要看源碼:kafka.admin.ConsumerGroupCommand 這個(gè)類 if [ "x$DAEMON_MODE" = "xtrue" ]; thenecho $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" elseexec $JAVA $KAFKA_HEAP_OPTS $KAFKA_JVM_PERFORMANCE_OPTS $KAFKA_GC_LOG_OPTS $KAFKA_JMX_OPTS $KAFKA_LOG4J_OPTS -cp $CLASSPATH $KAFKA_OPTS "$@" fi3. ConsumerGroupCommand
def main(args: Array[String]) {val opts = new ConsumerGroupCommandOptions(args)if (args.length == 0)CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, delete consumer group info, or reset consumer group offsets.")val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt, opts.resetOffsetsOpt).count(opts.options.has)if (actions != 1)CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete, --reset-offsets")//參數(shù)判斷opts.checkArgs()//通過(guò)ConsumerGroupCommandOptions,構(gòu)造ConsumerGroupService對(duì)象val consumerGroupService = new ConsumerGroupService(opts)try {if (opts.options.has(opts.listOpt))consumerGroupService.listGroups().foreach(println(_))else if (opts.options.has(opts.describeOpt))//因?yàn)榇舜挝覀兲骄康氖莐afka lag的數(shù)據(jù),所以主要看方法consumerGroupService.describeGroup()else if (opts.options.has(opts.deleteOpt))... }4. describeGroup()
def describeGroup(): Unit = {// 從配置類中獲取配置val group = opts.options.valuesOf(opts.groupOpt).asScala.headval membersOptPresent = opts.options.has(opts.membersOpt)val stateOptPresent = opts.options.has(opts.stateOpt)val offsetsOptPresent = opts.options.has(opts.offsetsOpt)val subActions = Seq(membersOptPresent, offsetsOptPresent, stateOptPresent).count(_ == true)if (subActions == 0 || offsetsOptPresent) {// kafka lag 信息的查詢,主要封裝與該類中val offsets = collectGroupOffsets()printOffsets(group, offsets._1, offsets._2)} else if (membersOptPresent) {val members = collectGroupMembers(opts.options.has(opts.verboseOpt))printMembers(group, members._1, members._2, opts.options.has(opts.verboseOpt))} elseprintState(group, collectGroupState()) }5. collectGroupOffsets()
def collectGroupOffsets(): (Option[String], Option[Seq[PartitionAssignmentState]]) = {val groupId = opts.options.valueOf(opts.groupOpt)// 首先構(gòu)造AdminClient 對(duì)象// 關(guān)于Admin Client,查看該博客即可了解:https://blog.csdn.net/zc0565/article/details/102791488// AdminClient 根據(jù) groupId 獲取 ConsumerGroupDescription //ConsumerGroupDescription: A detailed description of a single consumer group in the cluster.val consumerGroup = adminClient.describeConsumerGroups(List(groupId).asJava,withTimeoutMs(new DescribeConsumerGroupsOptions())).describedGroups.get(groupId).getval state = consumerGroup.state// 根據(jù)groupId 返回一個(gè)Map對(duì)象<TopicPartition,OffsetAndMetadata>// TopicPartition: 內(nèi)部封裝topic,partition// OffsetAndMetadata : 內(nèi)部封裝當(dāng)前topic,partition 對(duì)應(yīng)的groupId 的 當(dāng)前的offset 和元數(shù)據(jù)信息// 比如: // topic:kafka_lag_test partition:0 groupId:kafka-lag// 眾所周知,topic + partition + groupId 都對(duì)應(yīng)著唯一的 :currentOffset val committedOffsets = getCommittedOffsets(groupId).asScala.toMapvar assignedTopicPartitions = ListBuffer[TopicPartition]()// 下面這段代碼主要過(guò)濾空的TopicPartition,并且封裝TopicPartition 對(duì)應(yīng)的currentOffsetval rowsWithConsumer = consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq.sortWith(_.assignment.topicPartitions.size > _.assignment.topicPartitions.size).flatMap { consumerSummary =>val topicPartitions = consumerSummary.assignment.topicPartitions.asScalaassignedTopicPartitions = assignedTopicPartitions ++ topicPartitionsval partitionOffsets = consumerSummary.assignment.topicPartitions.asScala.map { topicPartition =>topicPartition -> committedOffsets.get(topicPartition).map(_.offset)}.toMap// 主要看一下這個(gè)方法collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), topicPartitions.toList,partitionOffsets, Some(s"${consumerSummary.consumerId}"), Some(s"${consumerSummary.host}"),Some(s"${consumerSummary.clientId}"))}val rowsWithoutConsumer = committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {case (topicPartition, offset) =>collectConsumerAssignment(groupId,Option(consumerGroup.coordinator),Seq(topicPartition),Map(topicPartition -> Some(offset.offset)),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE),Some(MISSING_COLUMN_VALUE))}(Some(state.toString), Some(rowsWithConsumer ++ rowsWithoutConsumer))}6. collectConsumerAssignment
//該方法返回一個(gè)PartitionAssignmentState數(shù)據(jù) private def collectConsumerAssignment(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {// 一般情況下,topicPartitions為空if (topicPartitions.isEmpty) {Array[PartitionAssignmentState](PartitionAssignmentState(group, coordinator, None, None, None, getLag(None, None), consumerIdOpt, hostOpt, clientIdOpt, None))}else// 主要看這個(gè)方法describePartitions(group, coordinator, topicPartitions.sortBy(_.partition), getPartitionOffset, consumerIdOpt, hostOpt, clientIdOpt) }7. describePartitions
private def describePartitions(group: String,coordinator: Option[Node],topicPartitions: Seq[TopicPartition],getPartitionOffset: TopicPartition => Option[Long],consumerIdOpt: Option[String],hostOpt: Option[String],clientIdOpt: Option[String]): Array[PartitionAssignmentState] = {def getDescribePartitionResult(topicPartition: TopicPartition, logEndOffsetOpt: Option[Long]): PartitionAssignmentState = {val offset = getPartitionOffset(topicPartition)PartitionAssignmentState(group, coordinator, Option(topicPartition.topic), Option(topicPartition.partition), offset,getLag(offset, logEndOffsetOpt), consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt)}//getLogEndOffsets//1. 根據(jù)bootstrap-server,groupId 實(shí)例化KafkaConsumer對(duì)象//2. 根據(jù)TopicPartition,調(diào)用KafkaConsumer的endOffsets方法,獲取topic內(nèi)每一個(gè)partition的最大offset//3. 根據(jù)之前查詢到的groupId對(duì)應(yīng)topic內(nèi)每一個(gè)partition的currentOffset,與此次獲取到的offset,做一個(gè)計(jì)算,最終得到Lag,并將其封裝PartitionAssignmentState返回getLogEndOffsets(topicPartitions).map {case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset))case (topicPartition, _) => getDescribePartitionResult(topicPartition, None)}.toArray }說(shuō)明
在kafka內(nèi),有以下幾個(gè)概念
分別說(shuō)明:
1. broker
broker可以理解為一臺(tái)安裝kafka的機(jī)器,多個(gè)broker構(gòu)成kafka集群,如果只有一個(gè)broker,那么這個(gè)kafka服務(wù)是單機(jī)的2. topic
topic 翻譯過(guò)來(lái)為主題. 一個(gè)kafka集群下有多個(gè)topic3. partition
partition翻譯為分區(qū),hive里面就有分區(qū)的概念,與hive的分區(qū)類似,一個(gè)topic 內(nèi)有多個(gè)partition4. groupId
結(jié)合實(shí)際說(shuō)明:
目前,我有 topic: kafka_lag ,該topic有兩個(gè)partition,目前往topic內(nèi)生產(chǎn)10000條數(shù)據(jù),按照默認(rèn)的分區(qū)測(cè)試,partition 0,partition 1 分別有 5000 條數(shù)據(jù).
除此之外,我有兩個(gè)group:kafka-consumer-lag-1,kafka-consumer-lag-2
首先:我使用kafka-consumer-lag-1 去消費(fèi)topic內(nèi)的數(shù)據(jù),加入,partition0,1 分別消費(fèi)2000 ,則offset 如下:
| kafka-consumer-lag-1 | kafka_lag | 0 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-1 | kafka_lag | 1 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 0 | 0 | 5000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 1 | 0 | 5000 | 5000 |
然后,我用 kafka-consumer-lag-2 去消費(fèi)topic內(nèi)的數(shù)據(jù),partition 0,1 分區(qū)消費(fèi)4000 ,則offset如下:
| kafka-consumer-lag-1 | kafka_lag | 0 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-1 | kafka_lag | 1 | 2000 | 3000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 0 | 4000 | 1000 | 5000 |
| kafka-consumer-lag-2 | kafka_lag | 1 | 4000 | 1000 | 5000 |
總結(jié)
由上面的數(shù)據(jù)展示可知:
topic + partition 對(duì)應(yīng)唯一的endOffset
topic + partition + group 對(duì)應(yīng)唯一的currentOffset
其實(shí)kafka 提供的 kafka-run-class.sh 就是使用的這個(gè)原理
由于kafka 源碼是使用scala寫(xiě)的,沒(méi)了解過(guò)scala的人看起來(lái)會(huì)比較困難,我用java重新給邏輯實(shí)現(xiàn)了一遍,代碼如下:
package com.clb.lag;import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.ConsumerGroupDescription; import org.apache.kafka.clients.admin.MemberDescription; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.ConsumerGroupState; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import java.util.*; import java.util.function.Consumer;/*** @author Xiahu* @create 2021/1/11*/ public class KafkaOffsetTool {private AdminClient adminClient;private static final String MISSING_COLUMN_VALUE = "-";private KafkaConsumer consumer;public KafkaOffsetTool() {Properties properties = new Properties();properties.put("bootstrap.servers", "node2:9092");//kerberos認(rèn)證需要自己實(shí)現(xiàn)if (false) {properties.put("sasl.kerberos.service.name", "kafka");properties.put("sasl.mechanism", "GSSAPI");properties.put("security.protocol", "PLAINTEXT");}this.adminClient = AdminClient.create(properties);}public List<PartitionOffsetState> collectGroupOffsets(String group) throws Exception {List<PartitionOffsetState> result = new ArrayList<>();List<String> groupId = Arrays.asList(group);Map<String, KafkaFuture<ConsumerGroupDescription>> describedGroups = adminClient.describeConsumerGroups(groupId).describedGroups();ConsumerGroupDescription consumerGroup = describedGroups.get(group).get();ConsumerGroupState state = consumerGroup.state();Map<TopicPartition, OffsetAndMetadata> committedOffsets = getCommitsOffsets(group);Collection<MemberDescription> memberDescriptions = consumerGroup.members();Set<MemberDescription> memberDescriptionSet = new HashSet<>();Iterator<MemberDescription> iterator = memberDescriptions.iterator();while (iterator.hasNext()) {MemberDescription memberDescription = iterator.next();if (null != memberDescription.assignment().topicPartitions()) {memberDescriptionSet.add(memberDescription);}}memberDescriptionSet.stream().sorted(new Comparator<MemberDescription>() {@Overridepublic int compare(MemberDescription o1, MemberDescription o2) {if (o1.assignment().topicPartitions().size() >= o2.assignment().topicPartitions().size()) {return 1;} else {return -1;}}}).forEach(new Consumer<MemberDescription>() {@Overridepublic void accept(MemberDescription memberDescription) {Set<TopicPartition> topicPartitions = memberDescription.assignment().topicPartitions();for (TopicPartition tp : topicPartitions) {long offset = committedOffsets.get(tp).offset();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(memberDescription.host());partitionOffsetState.setClientId(memberDescription.clientId());partitionOffsetState.setConsumerId(memberDescription.consumerId());partitionOffsetState.setPartition(tp.partition());partitionOffsetState.setTopic(tp.topic());partitionOffsetState.setOffset(offset);result.add(partitionOffsetState);}}});//封裝committedOffsetsIterator<Map.Entry<TopicPartition, OffsetAndMetadata>> entryIterator = committedOffsets.entrySet().iterator();while (entryIterator.hasNext()) {Map.Entry<TopicPartition, OffsetAndMetadata> entry = entryIterator.next();PartitionOffsetState partitionOffsetState = new PartitionOffsetState();partitionOffsetState.setGroup(group);partitionOffsetState.setCoordinator(consumerGroup.coordinator().toString());partitionOffsetState.setHost(MISSING_COLUMN_VALUE);partitionOffsetState.setClientId(MISSING_COLUMN_VALUE);partitionOffsetState.setConsumerId(MISSING_COLUMN_VALUE);partitionOffsetState.setPartition(entry.getKey().partition());partitionOffsetState.setTopic(entry.getKey().topic());partitionOffsetState.setOffset(entry.getValue().offset());result.add(partitionOffsetState);}return result;}private Map<TopicPartition, OffsetAndMetadata> getCommitsOffsets(String groupId) throws Exception {Map<TopicPartition, OffsetAndMetadata> result = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();return result;}public List<PartitionOffsetState> getLag(List<PartitionOffsetState> partitionOffsetStateList,String groupId) {getConsumer(new Properties(), groupId);List<TopicPartition> topicPartitionList = new ArrayList<>();for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {topicPartitionList.add(new TopicPartition(partitionOffset.getTopic(), partitionOffset.getPartition()));}Map<TopicPartition, Long> map = consumer.endOffsets(topicPartitionList);for (PartitionOffsetState partitionOffset : partitionOffsetStateList) {for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {if (entry.getKey().topic().equals(partitionOffset.getTopic()) && entry.getKey().partition() == partitionOffset.getPartition()) {partitionOffset.setLag(entry.getValue() - partitionOffset.getOffset());partitionOffset.setLogEndOffset(entry.getValue());}}}return partitionOffsetStateList;}private KafkaConsumer getConsumer(Properties prop, String groupId) {if (consumer == null) {createConsumer(prop, groupId);}return consumer;}public void createConsumer(Properties prop, String groupId) {//kerberos認(rèn)證需要自己實(shí)現(xiàn)if (false) {System.setProperty("java.security.krb5.conf", prop.getProperty(NuwaConstant.KERBEROS_KRB5));System.setProperty("java.security.auth.login.config", prop.getProperty(NuwaConstant.KERBEROS_LOGIN_CONFIG));prop.put(NuwaConstant.KAFKA_SECURITY_PROTOCOL, prop.getProperty(NuwaConstant.KAFKA_SECURITY_PROTOCOL));prop.put(NuwaConstant.KAFKA_SASL_MECHANISM, prop.getProperty(NuwaConstant.KAFKA_SASL_MECHANISM));prop.put(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME, prop.getProperty(NuwaConstant.KAFKA_SASL_KERBEROS_SERVICE_NAME));}String deserializer = StringDeserializer.class.getName();String broker = "node1:9092";prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer);prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);consumer = new KafkaConsumer(prop);}public static void main(String[] args) throws Exception {KafkaOffsetTool kafkaOffsetTool = new KafkaOffsetTool();List<PartitionOffsetState> partitionOffsetStates = kafkaOffsetTool.collectGroupOffsets("kafka-lag");partitionOffsetStates = kafkaOffsetTool.getLag(partitionOffsetStates,"kafka-lag");System.out.println(partitionOffsetStates);} }PartitionOffsetState
package com.clb.lag;import lombok.Data;/*** @author Xiahu* @create 2021/1/11*/ @Data public class PartitionOffsetState {private String group;private String coordinator;private String topic;private int partition;private Long offset;private Long lag;private String consumerId;private String host;private String clientId;private Long logEndOffset; }總結(jié)
以上是生活随笔為你收集整理的Kafka Shell Lag的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: android7.1root工具,And
- 下一篇: IT人必看!2018年上半年云栖大会30