日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

kafka 维护消费状态跟踪的方法和消费进度的跟踪

發(fā)布時(shí)間:2023/12/20 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka 维护消费状态跟踪的方法和消费进度的跟踪 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?kafka 維護(hù)消費(fèi)狀態(tài)跟蹤

大部分消息系統(tǒng)在 broker 端的維護(hù)消息被消費(fèi)的記錄:一個(gè)消息被分發(fā)到consumer 后 broker 就馬上進(jìn)行標(biāo)記或者等待 customer 的通知后進(jìn)行標(biāo)記。這樣也可以在消息在消費(fèi)后立馬刪除以減少空間占用。

但是這樣會(huì)不會(huì)有什么問(wèn)題呢?如果一條消息發(fā)送出去之后就立即被標(biāo)記為消費(fèi)過(guò)的,一旦 consumer處理消息時(shí)失敗了(比如程序崩潰)消息就丟失了。

為了解決這個(gè)問(wèn)題,很多消息系統(tǒng)提供了另外一個(gè)個(gè)功能:當(dāng)消息被發(fā)送出去之后僅僅被標(biāo)記為已發(fā)送狀態(tài),當(dāng)接到 consumer 已經(jīng)消費(fèi)成功的通知后才標(biāo)記為已被消費(fèi)的狀態(tài)。

這雖然解決了消息丟失的問(wèn)題,但產(chǎn)生了新問(wèn)題,首先如果 consumer處理消息成功了但是向 broker 發(fā)送響應(yīng)時(shí)失敗了,這條消息將被消費(fèi)兩次。第二個(gè)問(wèn)題時(shí),broker 必須維護(hù)每條消息的狀態(tài),并且每次都要先鎖住消息然后更改狀態(tài)然后釋放鎖》這樣麻煩又來(lái)了,且不說(shuō)要維護(hù)大量的狀態(tài)數(shù)據(jù),比如如果消息發(fā)送出去但沒(méi)有收到消費(fèi)成功的通知,這條消息將一直處于被鎖定的狀態(tài)

Kafka 采用了不同的策略.Topic 被分成了若干分區(qū),每個(gè)分區(qū)在同一時(shí)間只被一個(gè) consumer 消費(fèi)。這意味著每個(gè)分區(qū)被消費(fèi)的消息在日志中的位置僅僅是一個(gè)簡(jiǎn)單的整數(shù):offset。這樣就很容易標(biāo)記每個(gè)分區(qū)消費(fèi)狀態(tài)就很容易了,僅僅需要一個(gè)整數(shù)而已。這樣消費(fèi)狀態(tài)的跟蹤就很簡(jiǎn)單了。這帶來(lái)了另外一個(gè)好處:consumer 可以把 offset 調(diào)成一個(gè)較老的值,去重新消費(fèi)老的消息。這對(duì)傳統(tǒng)的消息系統(tǒng)來(lái)說(shuō)看起來(lái)有些不可思議,但確實(shí)是非常有用的,誰(shuí)規(guī)定了一條消息只能被消費(fèi)一次呢?
?

?消費(fèi)進(jìn)度的跟蹤

  • 所謂滯后程度,就是指消費(fèi)者當(dāng)前落后于生產(chǎn)者的程度。
  • Lag 應(yīng)該算是最最重要的監(jiān)控指標(biāo)了。它直接反映了一個(gè)消費(fèi)者的運(yùn)行情況。一個(gè)正常工作的消費(fèi)者,它的 Lag 值應(yīng)該很小,甚至是接近于 0 的,這表示該消費(fèi)者能夠及時(shí)地消費(fèi)生產(chǎn)者生產(chǎn)出來(lái)的消息,滯后程度很小。反之,如果一個(gè)消費(fèi)者 Lag 值很大,通常就表明它無(wú)法跟上生產(chǎn)者的速度,最終 Lag 會(huì)越來(lái)越大,從而拖慢下游消息的處理速度。
  • 通常來(lái)說(shuō),Lag 的單位是消息數(shù),而且我們一般是在主題這個(gè)級(jí)別上討論 Lag 的,但實(shí)際上,Kafka 監(jiān)控 Lag 的層級(jí)是在分區(qū)上的。如果要計(jì)算主題級(jí)別的,你需要手動(dòng)匯總所有主題分區(qū)的 Lag,將它們累加起來(lái),合并成最終的 Lag 值。
  • 你在實(shí)際業(yè)務(wù)場(chǎng)景中必須時(shí)刻關(guān)注消費(fèi)者的消費(fèi)進(jìn)度。
  • 使用 Kafka 自帶的命令行工具 kafka-consumer-groups 腳本。
  • 使用 Kafka Java Consumer API 編程。
  • 使用 Kafka 自帶的 JMX 監(jiān)控指標(biāo)。
  • 消費(fèi)進(jìn)度監(jiān)控3 種方法。
  • Kafka 自帶命令

  • ?Kafka 自帶的命令行工具?bin/kafka-consumer-groups.sh
  • kafka-consumer-groups 腳本是 Kafka 為我們提供的最直接的監(jiān)控消費(fèi)者消費(fèi)進(jìn)度的工具。
  • bin/kafka-consumer-groups.sh --bootstrap-server <Kafka broker連接信息> --describe --group <group名稱(chēng)>
  • ?
  • Kafka 連接信息就是 < 主機(jī)名:端口 > 對(duì),而 group 名稱(chēng)就是你的消費(fèi)者程序中設(shè)置的 group.id 值。
  • 示例: Kafka 集群的連接信息,即 localhost:9092。消費(fèi)者組名:testgroup
  • 它會(huì)按照消費(fèi)者組訂閱主題的分區(qū)進(jìn)行展示,每個(gè)分區(qū)一行數(shù)據(jù);其次,除了主題、分區(qū)等信息外,它會(huì)匯報(bào)每個(gè)分區(qū)當(dāng)前最新生產(chǎn)的消息的位移值(即 LOG-END-OFFSET 列值)、該消費(fèi)者組當(dāng)前最新消費(fèi)消息的位移值(即 CURRENT-OFFSET 值)、LAG 值(前兩者的差值)、消費(fèi)者實(shí)例 ID消費(fèi)者連接 Broker 的主機(jī)名以及消費(fèi)者的 CLIENT-ID 信息。
  • Kafka Java Consumer API [ Kafka 2.0.0 ]

  • 代碼示例:
  • 第 1 處是調(diào)用 AdminClient.listConsumerGroupOffsets 方法獲取給定消費(fèi)者組的最新消費(fèi)消息的位移;
  • 第 2 處則是獲取訂閱分區(qū)的最新消息位移;a
  • 第3 處就是執(zhí)行相應(yīng)的減法操作,獲取 Lag 值并封裝進(jìn)一個(gè) Map 對(duì)象。
  • public static Map<TopicPartition, Long> lagOf(String groupID, String bootstrapServers) throws TimeoutException {Properties props = new Properties();props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);try (AdminClient client = AdminClient.create(props)) {ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);try {Map<TopicPartition, OffsetAndMetadata> consumedOffsets = result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁止自動(dòng)提交位移props.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {Map<TopicPartition, Long> endOffsets = consumer.endOffsets(consumedOffsets.keySet());return endOffsets.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey(),entry -> entry.getValue() - consumedOffsets.get(entry.getKey()).offset()));}} catch (InterruptedException e) {Thread.currentThread().interrupt();// 處理中斷異常// ...return Collections.emptyMap();} catch (ExecutionException e) {// 處理ExecutionException// ...return Collections.emptyMap();} catch (TimeoutException e) {throw new TimeoutException("Timed out when getting lag for consumer group " + groupID);}}}
  • ?
  • Kafka JMX 監(jiān)控指標(biāo)

  • Kafka 默認(rèn)提供的 JMX 監(jiān)控指標(biāo)來(lái)監(jiān)控消費(fèi)者的 Lag 值
  • Kafka 消費(fèi)者提供了一個(gè)名為?kafka.consumer:type=consumer-fetch-manager-metrics,client-id=“{client-id}”的 JMX 指標(biāo),里面有很多屬性。和我們今天所講內(nèi)容相關(guān)的有兩組屬性:records-lag-max?和?records-lead-min它們分別表示此消費(fèi)者在測(cè)試窗口時(shí)間內(nèi)曾經(jīng)達(dá)到的最大的 Lag 值和最小的 Lead 值。
  • Lead 值是指消費(fèi)者最新消費(fèi)消息的位移與分區(qū)當(dāng)前第一條消息位移的差值。很顯然,Lag 和 Lead 是一體的兩個(gè)方面:Lag 越大的話(huà),Lead 就越小,反之也是同理。
  • 監(jiān)控到 Lag 越來(lái)越大,消費(fèi)者程序變得越來(lái)越慢了,至少是追不上生產(chǎn)者程序了.
  • Lead 越來(lái)越小,甚至是快接近于 0 了,消費(fèi)者端要丟消息了
  • ?
  • ?
  • Kafka 的消息是有留存時(shí)間設(shè)置的,默認(rèn)是 1 周,也就是說(shuō) Kafka 默認(rèn)刪除 1 周前的數(shù)據(jù)。倘若你的消費(fèi)者程序足夠慢,慢到它要消費(fèi)的數(shù)據(jù)快被 Kafka 刪除了,這時(shí)你就必須立即處理,否則一定會(huì)出現(xiàn)消息被刪除,從而導(dǎo)致消費(fèi)者程序重新調(diào)整位移值的情形。這可能產(chǎn)生兩個(gè)后果:一個(gè)是消費(fèi)者從頭消費(fèi)一遍數(shù)據(jù),另一個(gè)是消費(fèi)者從最新的消息位移處開(kāi)始消費(fèi),之前沒(méi)來(lái)得及消費(fèi)的消息全部被跳過(guò)了,從而造成丟消息的假象。
  • ?
  • Kafka 消費(fèi)者還在分區(qū)級(jí)別提供了額外的 JMX 指標(biāo),用于單獨(dú)監(jiān)控分區(qū)級(jí)別的 Lag 和 Lead 值。JMX 名稱(chēng)為:kafka.consumer:type=consumer-fetch-manager-metrics,partition=“{partition}”,topic=“{topic}”,client-id=“{client-id}”
  • ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    ?

    總結(jié)

    以上是生活随笔為你收集整理的kafka 维护消费状态跟踪的方法和消费进度的跟踪的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

    如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。