日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

java kafka 拉取_java获取kafka consumer lag

發(fā)布時(shí)間:2025/3/8 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java kafka 拉取_java获取kafka consumer lag 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

maven依賴(lài)

org.apache.kafka

kafka-clients

0.10.1.0

注意:kafka-clients版本需要0.10.1.0以上,因?yàn)檎{(diào)用了新增接口endOffsets;

lag=logsize-offset

logsize通過(guò)consumer的endOffsets接口獲得;offset通過(guò)consumer的committed接口獲得;

import java.util.ArrayList;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;

import org.apache.kafka.common.PartitionInfo;

import org.apache.kafka.common.TopicPartition;

public class KafkaConsumeLagMonitor {

public static Properties getConsumeProperties(String groupID, String bootstrap_server) {

Properties props = new Properties();

props.put("group.id", groupID);

props.put("bootstrap.servers", bootstrap_server);

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

return props;

}

public static void main(String[] args) {

String bootstrap_server = args[0];

String groupID = args[1];

String topic = args[2];

Map endOffsetMap = new HashMap();

Map commitOffsetMap = new HashMap();

Properties consumeProps = getConsumeProperties(groupID, bootstrap_server);

System.out.println("consumer properties:" + consumeProps);

//查詢(xún)topic partitions

KafkaConsumer consumer = new KafkaConsumer(consumeProps);

List topicPartitions = new ArrayList();

List partitionsFor = consumer.partitionsFor(topic);

for (PartitionInfo partitionInfo : partitionsFor) {

TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());

topicPartitions.add(topicPartition);

}

//查詢(xún)log size

Map endOffsets = consumer.endOffsets(topicPartitions);

for (TopicPartition partitionInfo : endOffsets.keySet()) {

endOffsetMap.put(partitionInfo.partition(), endOffsets.get(partitionInfo));

}

for (Integer partitionId : endOffsetMap.keySet()) {

System.out.println(String.format("at %s, topic:%s, partition:%s, logSize:%s", System.currentTimeMillis(), topic, partitionId, endOffsetMap.get(partitionId)));

}

//查詢(xún)消費(fèi)offset

for (TopicPartition topicAndPartition : topicPartitions) {

OffsetAndMetadata committed = consumer.committed(topicAndPartition);

commitOffsetMap.put(topicAndPartition.partition(), committed.offset());

}

//累加lag

long lagSum = 0l;

if (endOffsetMap.size() == commitOffsetMap.size()) {

for (Integer partition : endOffsetMap.keySet()) {

long endOffSet = endOffsetMap.get(partition);

long commitOffSet = commitOffsetMap.get(partition);

long diffOffset = endOffSet - commitOffSet;

lagSum += diffOffset;

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", partition:" + partition + ", endOffset:" + endOffSet + ", commitOffset:" + commitOffSet + ", diffOffset:" + diffOffset);

}

System.out.println("Topic:" + topic + ", groupID:" + groupID + ", LAG:" + lagSum);

} else {

System.out.println("this topic partitions lost");

}

consumer.close();

}

}

另外一個(gè)思路可參考kafka源碼kafka.tools.ConsumerOffsetChecker實(shí)現(xiàn),offset直接讀取 zk節(jié)點(diǎn)內(nèi)容,logsize通過(guò)consumer的getOffsetsBefore方法獲取,整體來(lái)說(shuō),較麻煩;

總結(jié)

以上是生活随笔為你收集整理的java kafka 拉取_java获取kafka consumer lag的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。