java检测kafka是否连接成功,Kafka 消费者失败检测
用于測試消費者故障檢測的簡單kafka客戶端不提供預期的行為 . 我一定錯過了什么 .
使用kafka版本0.10.1.0和使用java kafka-client 0.10.1.0的使用者進行測試 .
下課是平行兩次午餐 . 正如所料,一個客戶正在消費該組中的主題 . 但是如果使用kill -9殺死活躍的消費者,則該組不會重新 balancer 到其他消費者 .
public class BasicConsumer {
public BasicConsumer() {
// set up the consumer
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000"); // half a minute timeout
props.put("max.poll.records", "10");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
System.out.printf("Starting Consumer %n");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1"));
LocalDateTime inFewMinutes = LocalDateTime.now().plusMinutes(10);
try {
while (LocalDateTime.now().isBefore(inFewMinutes)) {
ConsumerRecords records = consumer.poll(1000);
System.out.printf("%s Poll returned %d records%n", LocalDateTime.now(), records.count());
for (ConsumerRecord record : records) {
Map message = new Gson().fromJson(record.value(), Map.class);
Map data = (Map) message.get("data");
String msgId = (String) data.get("TRANSFER_ID");
System.out.printf("%s Handling record id %s with offset %s%n", LocalDateTime.now(), msgId, record.offset());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
System.out.printf("Consumer closed cleanly...%n");
}
}
}
kafka和zookeeper服務器是簡單的安裝,無需對配置進行任何修改 .
提前感謝任何想法 .
late edition :問題已解決
為了殺死消費者,我停止了用于啟動java客戶端的 gradle run 命令 . 這實際上并沒有阻止java進程......
正確殺死java進程表明,在日志中,被殺死的活動消費者與使用kafka進行的重新 balancer 之間會出現30秒的延遲,以便將手提供給第二個消費者 . 正如session.timeout.ms參數所預期的那樣 .
總結
以上是生活随笔為你收集整理的java检测kafka是否连接成功,Kafka 消费者失败检测的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 毕设/私活/bigold必备项目,一个挣
- 下一篇: reg类型变量综合电路_verilog语