日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka_rebalance过长问题排查

發布時間:2024/2/28 编程问答 17 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka_rebalance过长问题排查 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

    • 問題簡介
      • 使用場景
    • 原因分析
    • 進一步探索
      • 先看server端設置超時的代碼
      • consumer 端的rebalanceTimeoutMs
    • 解決方案

問題簡介

??最近kafka集群頻繁出現了長時間rebalance(耗時5min級別),kafka在rebalance期間對應的consumer group中的consumer都是無法poll()下來數據的,導致consumer消費kafka當中數據出現了較大的延遲。

使用場景

a,b,c 三個consumer同屬于一個group test_scheduled
但是a,b都是定時任務型的,c是持續消費的。

比如a在13:10:00啟動,每隔3個小時啟動一次
b在13:15:00 啟動,每隔3小時啟動一次
定時任務的consumer 在處理完任務后會暫停,調用的是pause方法

consumer.pause(partitionList) /*** @see KafkaConsumer#pause(Collection)*public void pause(Collection<TopicPartition> partitions);*/

consumer的設置是

props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", 1000);props.put("session.timeout.ms", 120000);props.put("max.poll.interval.ms",600000);props.put("max.poll.records", 100);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

這個時候在某些情況下可能會導致rebalance的時間過長
具體的情景是
a在13:10:00啟動 這個時候進行了一次rebalance,很快(秒級),但是a只花費了不到1分鐘就把kafka里面積攢了3個小時的數據處理完了,所以a在13:10:05 進入了pause()

當時間到達了13:15:00 的時候后b啟動了,這個時候又觸發了rebalance,但是這個時候的rebalance直到
13:20:00 才能結束 (通過日志查看是在a 在max.poll.interval.ms過期的時候離開test_scheduled 然后rebalance 結束)

server.log中的有用信息有

rebalance的開始和結束 [2019-02-18 13:15:00,015] INFO [GroupCoordinator 2]: Preparing to rebalance group test_scheduled with old generation 702 (__consumer_offsets-4) (kafka.coordinator.group.GroupCoordinator) ...2019-02-18 13:20:00,245] DEBUG [GroupCoordinator 2]: Member consumer-68-e26f835f-4e14-4e3e-9768-8dbdbeac06f3 in group test_scheduled has left, removing it from the group (kafka.coordinator.group.GroupCoordinator) [2019-02-18 13:20:00,245] INFO [GroupCoordinator 2]: Stabilized group test_scheduled generation 703 (__consumer_offsets-4) (kafka.coordinator.group.GroupCoordinator)

client端的一些信息
a離開group的信息

2019-02-18 13:20:00.243 kafka-coordinator-heartbeat-thread | test_scheduled DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator :183 [Consumer clientId=consumer-68, groupId=test_scheduled] Sending LeaveGroup request to coordinator 10.9.17.46:9092 (id: 2147483645 rack: null) 2019-02-18 13:20:00.244 kafka-coordinator-heartbeat-thread | test_scheduled DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator :177 [Consumer clientId=consumer-68, groupId=test_scheduled] Disabling heartbeat thread

同時,上面的問題只是一部分情況,有些時候并不在定時任務啟動或者離開的時候也會發生很多次rebalance,但是相對來說快一些,有些也達到幾十秒。

原因分析

??為了解決以上問題,調研了一下kafka consumer的原理,下面主要圍繞幾個重要的配置項展開。

session.timeout.ms 默認10000ms heartbeat.interval.ms 默認3000ms max.poll.interval.ms 默認300000ms max.poll.records 默認500條

?? session.timeout.ms是consumer和kafka server維持一個會話的時間,也就是說consumer和server之間通信的間隔時間最長是這些,超過這個時間的話server就認為consumer不可用,會被從consumer group當中踢掉。因為現在consumer有一個專門的heartbeat后臺線程來維持心跳,默認的時間間隔是 heartbeat.interval.ms 默認3000ms,所以這個配置不用擔心

??max.poll.interval.ms是consumer在兩次poll()之間的最大時間間隔,超過這個時間配置的consumer都會被從consumer group 當中踢掉。這樣的話,在兩次poll()中間的數據處理時間久需要控制了。默認的時間是 300000,也就是5分鐘,同時每次拉下來的數據條數受max.poll.records控制,默認最多為500條。

??回過頭來看我們系統情況,我們的數據有些關聯數據比較多,可能存在一個批次的數據消費處理時間超過5min。在這種理論基礎上,我們將max.poll.interval.ms加大,同時將max.poll.records減小到100,這個時候再觀察,發現rebalance的次數明顯下降,從原來的每小時30次下降到7次左右。
??但是rebalance耗時比較長的情況仍然存在。這個時候考慮是因為定時任務的啟動和結束導致的rebalance,但是為何rebalance耗時5分鐘仍然是不可理解的。
后面在kafka的官方文檔中有這個的配置:rebalance.timeout.ms文檔上介紹的就是rebalance會等待consumer 發起join-group請求的最大時長,默認是60s,但是這個配置是針對的kafka-connect的,不是我們這里的。

進一步探索

多方查找不得結果,最后只能看代碼了。

先看server端設置超時的代碼

對應源碼點擊這里

private def prepareRebalance(group: GroupMetadata, reason: String) {// if any members are awaiting sync, cancel their request and have them rejoinif (group.is(CompletingRebalance))resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)val delayedRebalance = if (group.is(Empty))new InitialDelayedJoin(this,joinPurgatory,group,groupConfig.groupInitialRebalanceDelayMs,groupConfig.groupInitialRebalanceDelayMs,max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))else//因為我們發生rebalance的時候一般情況下group不是enpty,所以大多數走的是這個,// 可以看到這里用的是group.rebalanceTimeoutMsnew DelayedJoin(this, group, group.rebalanceTimeoutMs)group.transitionTo(PreparingRebalance)info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")val groupKey = GroupKey(group.groupId)joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))}

從上面的代碼中可以看到
這里用的是group.rebalanceTimeoutMs,感覺沒啥問題
下面再看看group.rebalanceTimeoutMs具體的實現

對應源碼點擊這里
在GroupMetadata.scala文件當中

private val members = new mutable.HashMap[String, MemberMetadata]def rebalanceTimeoutMs = members.values.foldLeft(0) { (timeout, member) =>timeout.max(member.rebalanceTimeoutMs)}

從上面可以看出group.rebalanceTimeoutMs是去group中所有consumer的最大的member.rebalanceTimeoutMs
對應的server段在prepare階段設置的超時時間就是使用的max{consumer.rebalanceTimeoutMs}
對應的member則是MemberMetadata

class MemberMetadata(val memberId: String,val groupId: String,val clientId: String,val clientHost: String,val rebalanceTimeoutMs: Int,val sessionTimeoutMs: Int,val protocolType: String,var supportedProtocols: List[(String, Array[Byte])])

這里對應的就是每個consumer自己設定的rebalanceTimeoutMs

server端排查了一番,問題不大,而且使用的參數也是consumer傳遞過來的。那么下面就要看看consumer端的實現了。

consumer 端的rebalanceTimeoutMs

AbstractCoordinator.sendJoinGroupRequest()中有往server端發送request時候設置的
rebalanceTimeout

/*** Join the group and return the assignment for the next generation. This function handles both* JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if* elected leader by the coordinator.* @return A request future which wraps the assignment returned from the group leader*/private RequestFuture<ByteBuffer> sendJoinGroupRequest() {if (coordinatorUnknown())return RequestFuture.coordinatorNotAvailable();// send a join group request to the coordinatorlog.info("(Re-)joining group");JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(groupId,this.sessionTimeoutMs,this.generation.memberId,protocolType(),//這里設置的rebalanceTimeoutMsmetadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);return client.send(coordinator, requestBuilder).compose(new JoinGroupResponseHandler());}

那么coordinator中的rebalanceTimeoutMs又是從哪里設置的呢,這個可以從其構造函數中進行追溯。

/*** Initialize the coordination manager.*/public AbstractCoordinator(LogContext logContext,ConsumerNetworkClient client,String groupId,int rebalanceTimeoutMs,int sessionTimeoutMs,int heartbeatIntervalMs,Metrics metrics,String metricGrpPrefix,Time time,long retryBackoffMs,boolean leaveGroupOnClose) {this.log = logContext.logger(AbstractCoordinator.class);this.client = client;this.time = time;this.groupId = groupId;this.rebalanceTimeoutMs = rebalanceTimeoutMs;this.sessionTimeoutMs = sessionTimeoutMs;}/*** Initialize the coordination manager.*/public ConsumerCoordinator(LogContext logContext,ConsumerNetworkClient client,String groupId,int rebalanceTimeoutMs,int sessionTimeoutMs,int heartbeatIntervalMs,List<PartitionAssignor> assignors,Metadata metadata,SubscriptionState subscriptions,Metrics metrics,String metricGrpPrefix,Time time,long retryBackoffMs,boolean autoCommitEnabled,int autoCommitIntervalMs,ConsumerInterceptors<?, ?> interceptors,boolean excludeInternalTopics,final boolean leaveGroupOnClose) {super(logContext,client,groupId,rebalanceTimeoutMs,sessionTimeoutMs,heartbeatIntervalMs,metrics,metricGrpPrefix,time,retryBackoffMs,leaveGroupOnClose);}

實際的賦值動作中設置consumer實例的rebalanceTimeoutMs的時候使用的是max.poll.interval.ms
而不是 rebalance.timeout.ms

this.coordinator = new ConsumerCoordinator(logContext,this.client,groupId,config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),assignors,this.metadata,this.subscriptions,metrics,metricGrpPrefix,this.time,retryBackoffMs,config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),this.interceptors,config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG),config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG));

哭暈在廁所…

解決方案

將定時任務的consumer單獨放在一個分組consumer-group,因為定時任務對rebalance時間的延遲不敏感,這樣的話就不會影響實時消費的consumer了,同時建議max.poll.interval.ms 不要設置的太長,否則會影響kafka的rebalance,導致rebalance的耗時過長。如果任務確實比較耗時的話也應該設置為異步處理然后手動提交的方式,同時在consumer端設置pause,避免導致活鎖。

總結

以上是生活随笔為你收集整理的kafka_rebalance过长问题排查的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。