日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

java 连接kafka超时_java – Kafka KStreams – 处理超时

發布時間:2024/8/5 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 java 连接kafka超时_java – Kafka KStreams – 处理超时 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

我試圖使用< KStream> .process()與Time

Windows.of(“name”,30000)批量處理一些KTable值并發送它們.似乎30秒超過了消費者超時間隔,之后Kafka認為該消費者已經解散并釋放分區.

我已經嘗試提高輪詢頻率和提交間隔以避免這種情況:

config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "5000");

config.put(StreamsConfig.POLL_MS_CONFIG, "5000");

不幸的是,這些錯誤仍在發生:

(很多這些)

ERROR o.a.k.s.p.internals.RecordCollector - Error sending record to topic kafka_test1-write_aggregate2-changelog

org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for kafka_test1-write_aggregate2-changelog-0

其次是:

INFO o.a.k.c.c.i.AbstractCoordinator - Marking the coordinator 12.34.56.7:9092 (id: 2147483547 rack: null) dead for group kafka_test1

WARN o.a.k.s.p.internals.StreamThread - Failed to commit StreamTask #0_0 in thread [StreamThread-1]:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)

顯然,我需要更頻繁地將心跳發送回服務器.怎么樣?

我的拓撲結構是:

KStreamBuilder kStreamBuilder = new KStreamBuilder();

KStream lines = kStreamBuilder.stream(TOPIC);

KTable, String> kt = lines.aggregateByKey(

new DBAggregateInit(),

new DBAggregate(),

TimeWindows.of("write_aggregate2", 30000));

DBProcessorSupplier dbProcessorSupplier = new DBProcessorSupplier();

kt.toStream().process(dbProcessorSupplier);

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

KTable每隔30秒按鍵對值進行分組.在Processor.init()中,我調用context.schedule(30000).

DBProcessorSupplier提供DBProcessor的實例.這是AbstractProcessor的一個實現,其中提供了所有覆蓋.他們只做LOG,所以我知道每個人都被擊中.

這是一個非常簡單的拓撲結構,但很明顯我在某個地方錯過了一個步驟.

編輯:

我知道我可以在服務器端進行調整,但我希望有一個客戶端解決方案.我喜歡在客戶端退出/死亡時很快就可以使用分區的概念.

編輯:

為了簡化問題,我從圖中刪除了聚合步驟.它現在只是消費者 – >處理器(). (如果我將消費者直接發送到.print(),它會很快工作,所以我知道沒關系). (類似地,如果我通過.print()輸出聚合(KTable),它似乎也可以.

我發現.process() – 應該每隔30秒調用一次.punctuate()實際上阻塞了可變長度的時間并且隨機輸出(如果有的話).

進一步:

我將調試級別設置為’debug’并重新啟動.我看到很多消息:

DEBUG o.a.k.s.p.internals.StreamTask - Start processing one record [ConsumerRecord

但是.punctuate()函數中的斷點沒有被擊中.所以它做了很多工作,但沒有讓我有機會使用它.

總結

以上是生活随笔為你收集整理的java 连接kafka超时_java – Kafka KStreams – 处理超时的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。