日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

kafka创建topic_一网打尽Kafka常用命令、脚本及配置,宜收藏!

發(fā)布時間:2024/9/19 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 kafka创建topic_一网打尽Kafka常用命令、脚本及配置,宜收藏! 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

前言

通過前面 7 篇文章的介紹,小伙伴們應該對 Kafka 運行工作原理有一個相對比較清晰的認識了。

Kafka是什么?一起來看看吧!

Kafka 安裝及簡單命令使用

Kafka中消息如何被存儲到Broker上?

Kafka消息發(fā)送時,網(wǎng)絡(luò)“偷偷”幫忙做的那點事兒

一文讀懂Kafka消費者背后的那點"貓膩"

Kafka消息在服務端存儲與讀取

Kafka集群內(nèi)部工作原理的那些事

為了提高平時的工作效率,幫助我們快速定位一些線上問題,比如查看部分 Partition 堆積機器 IP 等操作,這篇文章總結(jié)了一些平時常用到的一些 Kafka 命令及常用配置,方便日后查閱(該文章中提到的相關(guān)配置會持續(xù)更新)。

文章概覽

  • 常用腳本及命令總結(jié)。
  • 常用配置及說明。
  • 常用命令總結(jié)

    一. kafka-topic.sh 腳本相關(guān)常用命令,主要操作 Topic。

  • 創(chuàng)建名字為 "op_log" 的 Topic。
  • $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic op_log
  • 查看指定 ZK?管理的 Topic 列表
  • $ bin/kafka-topics.sh --list --zookeeper localhost:2181

  • 查看指定 Topic 的詳細信息,包括 Partition 個數(shù),副本數(shù),ISR 信息
  • $ bin/kafka-topics.sh --zookeeper localhost:2181 --describe op_log
  • 修改指定 Topic 的 Partition 個數(shù)。注意:該命令只能增加 Partition 的個數(shù),不能減少 Partition 個數(shù),當修改的 Partition 個數(shù)小于當前的個數(shù),會報如下錯誤:Error while executing topic command : The number of partitions for a topic can only be increased
  • $ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic op_log1 --partition 4
  • 刪除名字為 "op_log" 的 Topic。注意如果直接執(zhí)行該命令,而沒有設(shè)置 delete.topic.enable 屬性為 true 時,是不會立即執(zhí)行刪除操作的,而是僅僅將指定的 Topic 標記為刪除狀態(tài),之后會啟動后臺刪除線程來刪除。
  • $ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic op_log

    二. kafka-consumer-groups.sh 腳本常用命令,主要用于操作消費組相關(guān)的。

  • 查看消費端的所有消費組信息(ZK 維護消費信息)
  • $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
  • 查看消費端的所有消費組信息(Kafka 維護消費信息)
  • $ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
  • 查看指定 group 的詳細消費情況,包括當前消費的進度,消息積壓量等等信息(ZK 維護消息信息)
  • $ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-1291 --describe
  • 查看指定 group 的詳細消費情況,包括當前消費的進度,消息積壓量等等信息(Kafka 維護消費信息)
  • $ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group mygroup --describe

    三. kafka-consumer-offset-checker.sh 腳本常用命令,用于檢查 OffSet 相關(guān)信息。(注意:該腳本在 0.9 以后可以使用 kafka-consumer-groups.sh 腳本代替,官方已經(jīng)標注為 deprecated 了)

  • 查看指定 group 的 OffSet 信息、所有者等信息
  • $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291
  • 查看指定 group 的 OffSet 信息,包括消費者所在的 IP 信息等等
  • $ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291 --broker-info

    四. kafka-configs.sh 腳本常用命令,該腳本主要用于增加/修改 Kafka 集群的配置信息。

  • 對指定 entry 添加配置,其中 entry 可以是 topics、clients;如下,給指定的 topic 添加兩個屬性,分別為 max.message.byte、flush.message。
  • $ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --add-config 'max.message.bytes=50000000,flush.message=5'
  • 對指定 entry 刪除配置,其中 entry 可以是 topics、clients;如下。對指定的 topic 刪除 flush.message 屬性。
  • $ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --delete-config 'flush.message'
  • 查看指定 topic 的配置項。
  • $ bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name mytopic --describe

    五. kafka-reassign-partitions.sh 腳本相關(guān)常用命令,主要操作 Partition 的負載情況。

  • 生成 partition 的移動計劃,--broker-list 參數(shù)指定要挪動的 Broker 的范圍,--topics-to-move-json-file 參數(shù)指定 Json 配置文件
  • $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "0" --topics-to-move-json-file ~/json/op_log_topic-to-move.json --generate
  • 執(zhí)行 Partition 的移動計劃,--reassignment-json-file 參數(shù)指定 RePartition 后 Assigned Replica 的分布情況。
  • $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --execute
  • 檢查當前 rePartition 的進度情況。
  • $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --verify

    常用配置及說明

    kafka 常見重要配置說明,分為四部分

    • Broker Config:kafka 服務端的配置
    • Producer Config:生產(chǎn)端的配置
    • Consumer Config:消費端的配置
    • Kafka Connect Config:kafka 連接相關(guān)的配置

    Broker Config

  • zookeeper.connect
  • 連接 zookeeper 集群的地址,用于將 kafka 集群相關(guān)的元數(shù)據(jù)信息存儲到指定的 zookeeper 集群中

  • advertised.port
  • 注冊到 zookeeper 中的地址端口信息,在 IaaS 環(huán)境中,默認注冊到 zookeeper 中的是內(nèi)網(wǎng)地址,通過該配置指定外網(wǎng)訪問的地址及端口,advertised.host.name 和 advertised.port 作用和 advertised.port 差不多,在 0.10.x 之后,直接配置 advertised.port 即可,前兩個參數(shù)被廢棄掉了。

  • auto.create.topics.enable
  • 自動創(chuàng)建 topic,默認為 true。其作用是當向一個還沒有創(chuàng)建的 topic 發(fā)送消息時,此時會自動創(chuàng)建一個 topic,并同時設(shè)置 -num.partition 1 (partition 個數(shù)) 和 default.replication.factor (副本個數(shù),默認為 1) 參數(shù)。

    一般該參數(shù)需要手動關(guān)閉,因為自動創(chuàng)建會影響 topic 的管理,我們可以通過 kafka-topic.sh 腳本手動創(chuàng)建 topic,通常也是建議使用這種方式創(chuàng)建 topic。在 0.10.x 之后提供了 kafka-admin 包,可以使用其來創(chuàng)建 topic。

  • auto.leader.rebalance.enable
  • 自動 rebalance,默認為 true。其作用是通過后臺線程定期掃描檢查,在一定條件下觸發(fā)重新 leader 選舉;在生產(chǎn)環(huán)境中,不建議開啟,因為替換 leader 在性能上沒有什么提升。

  • background.threads
  • 后臺線程數(shù),默認為 10。用于后臺操作的線程,可以不用改動。

  • broker.id
  • Broker 的唯一標識,用于區(qū)分不同的 Broker。kafka 的檢查就是基于此 id 是否在 zookeeper 中/brokers/ids 目錄下是否有相應的 id 目錄來判定 Broker 是否健康。

  • compression.type
  • 壓縮類型。此配置可以接受的壓縮類型有 gzip、snappy、lz4。另外可以不設(shè)置,即保持和生產(chǎn)端相同的壓縮格式。

  • delete.topic.enable
  • 啟用刪除 topic。如果關(guān)閉,則無法使用 admin 工具進行 topic 的刪除操作。

  • leader.imbalance.check.interval.seconds
  • partition 檢查重新 rebalance 的周期時間

  • leader.imbalance.per.broker.percentage
  • 標識每個 Broker 失去平衡的比率,如果超過改比率,則執(zhí)行重新選舉 Broker 的 leader

  • log.dir / log.dirs
  • 保存 kafka 日志數(shù)據(jù)的位置。如果 log.dirs 沒有設(shè)置,則使用 log.dir 指定的目錄進行日志數(shù)據(jù)存儲。

  • log.flush.interval.messages
  • partition 分區(qū)的數(shù)據(jù)量達到指定大小時,對數(shù)據(jù)進行一次刷盤操作。比如設(shè)置了 1024k 大小,當 partition 積累的數(shù)據(jù)量達到這個數(shù)值時則將數(shù)據(jù)寫入到磁盤上。

  • log.flush.interval.ms
  • 數(shù)據(jù)寫入磁盤時間間隔,即內(nèi)存中的數(shù)據(jù)保留多久就持久化一次,如果沒有設(shè)置,則使用 log.flush.scheduler.interval.ms 參數(shù)指定的值。

  • log.retention.bytes
  • 表示 topic 的容量達到指定大小時,則對其數(shù)據(jù)進行清除操作,默認為-1,永遠不刪除。

  • log.retention.hours
  • 標示 topic 的數(shù)據(jù)最長保留多久,單位是小時

  • log.retention.minutes
  • 表示 topic 的數(shù)據(jù)最長保留多久,單位是分鐘,如果沒有設(shè)置該參數(shù),則使用 log.retention.hours 參數(shù)

  • log.retention.ms
  • 表示 topic 的數(shù)據(jù)最長保留多久,單位是毫秒,如果沒有設(shè)置該參數(shù),則使用 log.retention.minutes 參數(shù)

  • log.roll.hours
  • 新的 segment 創(chuàng)建周期,單位小時。kafka 數(shù)據(jù)是以 segment 存儲的,當周期時間到達時,就創(chuàng)建一個新的 segment 來存儲數(shù)據(jù)。

  • log.segment.bytessegment 的大小。當 segment 大小達到指定值時,就新創(chuàng)建一個 segment。

  • message.max.bytes

  • topic 能夠接收的最大文件大小。需要注意的是 producer 和 consumer 端設(shè)置的大小需要一致。

  • min.insync.replicas
  • 最小副本同步個數(shù)。當 producer 設(shè)置了 request.required.acks 為-1 時,則 topic 的副本數(shù)要同步至該參數(shù)指定的個數(shù),如果達不到,則 producer 端會產(chǎn)生異常。

  • num.io.threads
  • 指定 io 操作的線程數(shù)

  • num.network.threads
  • 執(zhí)行網(wǎng)絡(luò)操作的線程數(shù)

  • num.recovery.threads.per.data.dir
  • 每個數(shù)據(jù)目錄用于恢復數(shù)據(jù)的線程數(shù)

  • num.replica.fetchers
  • 從 leader 備份數(shù)據(jù)的線程數(shù)

  • offset.metadata.max.bytes
  • 允許消費者端保存 offset 的最大個數(shù)

  • offsets.commit.timeout.ms
  • offset 提交的延遲時間

  • offsets.topic.replication.factor
  • topic 的 offset 的備份數(shù)量。該參數(shù)建議設(shè)置更高保證系統(tǒng)更高的可用性

  • port
  • 端口號,Broker 對外提供訪問的端口號。

  • request.timeout.ms
  • Broker 接收到請求后的最長等待時間,如果超過設(shè)定時間,則會給客戶端發(fā)送錯誤信息

  • zookeeper.connection.timeout.ms
  • 客戶端和 zookeeper 建立連接的超時時間,如果沒有設(shè)置該參數(shù),則使用 zookeeper.session.timeout.ms 值

  • connections.max.idle.ms
  • 空連接的超時時間。即空閑連接超過該時間時則自動銷毀連接。

    Producer Config

  • bootstrap.servers
  • 服務端列表。即接收生產(chǎn)消息的服務端列表

  • key.serializer
  • 消息鍵的序列化方式。指定 key 的序列化類型

  • value.serializer
  • 消息內(nèi)容的序列化方式。指定 value 的序列化類型

  • acks
  • 消息寫入 Partition 的個數(shù)。通常可以設(shè)置為 0,1,all;當設(shè)置為 0 時,只需要將消息發(fā)送完成后就完成消息發(fā)送功能;當設(shè)置為 1 時,即 Leader Partition 接收到數(shù)據(jù)并完成落盤;當設(shè)置為 all 時,即主從 Partition 都接收到數(shù)據(jù)并落盤。

  • buffer.memory
  • 客戶端緩存大小。即 Producer 生產(chǎn)的數(shù)據(jù)量緩存達到指定值時,將緩存數(shù)據(jù)一次發(fā)送的 Broker 上。

  • compression.type
  • 壓縮類型。指定消息發(fā)送前的壓縮類型,通常有 none, gzip, snappy, or, lz4 四種。不指定時消息默認不壓縮。

  • retries
  • 消息發(fā)送失敗時重試次數(shù)。當該值設(shè)置大于 0 時,消息因為網(wǎng)絡(luò)異常等因素導致消息發(fā)送失敗進行重新發(fā)送的次數(shù)。

    Consumer Config

  • bootstrap.servers
  • 服務端列表。即消費端從指定的服務端列表中拉取消息進行消費。

  • key.deserializer
  • 消息鍵的反序列化方式。指定 key 的反序列化類型,與序列化時指定的類型相對應。

  • value.deserializer
  • 消息內(nèi)容的反序列化方式。指定 value 的反序列化類型,與序列化時指定的類型相對應。

  • fetch.min.bytes
  • 抓取消息的最小內(nèi)容。指定每次向服務端拉取的最小消息量。

  • group.id
  • 消費組中每個消費者的唯一表示。

  • heartbeat.interval.ms
  • 心跳檢查周期。即在周期性的向 group coordinator 發(fā)送心跳,當服務端發(fā)生 rebalance 時,會將消息發(fā)送給各個消費者。該參數(shù)值需要小于 session.timeout.ms,通常為后者的 1/3。

  • max.partition.fetch.bytes
  • Partition 每次返回的最大數(shù)據(jù)量大小。

  • session.timeout.ms
  • consumer 失效的時間。即 consumer 在指定的時間后,還沒有響應則認為該 consumer 已經(jīng)發(fā)生故障了。

  • auto.offset.reset
  • 當 kafka 中沒有初始偏移量或服務器上不存在偏移量時,指定從哪個位置開始消息消息。earliest:指定從頭開始;latest:從最新的數(shù)據(jù)開始消費。

    Kafka Connect Config

  • group.id
  • 消費者在消費組中的唯一標識

  • internal.key.converter
  • 內(nèi)部 key 的轉(zhuǎn)換類型。

  • internal.value.converter
  • 內(nèi)部 value 的轉(zhuǎn)換類型。

  • key.converter
  • 服務端接收到 key 時指定的轉(zhuǎn)換類型。

  • value.converter
  • 服務端接收到 value 時指定的轉(zhuǎn)換類型。

  • bootstrap.servers
  • 服務端列表。

  • heartbeat.interval.ms
  • 心跳檢測,與 consumer 中指定的配置含義相同。

  • session.timeout.ms
  • session 有效時間,與 consumer 中指定的配置含義相同。

    總結(jié)

    本文總結(jié)了平時經(jīng)常用到的一些 Kafka 配置及命令說明,方便隨時查看;喜歡的朋友可以收藏以備不時之需。

    下篇文章我們來分析一些經(jīng)常在面試中碰到的問題及相應的解決辦法,敬請期待。


    總結(jié)

    以上是生活随笔為你收集整理的kafka创建topic_一网打尽Kafka常用命令、脚本及配置,宜收藏!的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。