kafka常用命令及问题解决
常用命令
Kafka內部提供了許多管理腳本,這些腳本都放在$KAFKA_HOME/bin目錄下,而這些類的實現都是放在源碼的kafka/core/src/main/scala/kafka/tools/路徑下。
topic相關
kafka-topics.sh
kafka-topics.sh用于維護topic。包括create, delete, describe, change
#創建topic kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 2 --partitions 6 --topic topic.real.mes.huoju kafka-topics.sh --list --bootstrap-server kafka1:9092 #刪除topic。有更多其他配合,不一定刪除得了。 kafka-topics.sh --delete --bootstrap-server kafka1:9092 --topic topic.real.mesoptions:
| –create | 創建topic. |
| –describe | 獲取topic信息. |
| –list | List all available topics. |
| –alter | 修改topic:Alter the number of partitions,replica assignment, and/or configuration for the topic. |
| –bootstrap-server | 必需。Kafka server列表 |
| –command-config | 配置文件 |
| –config <String: name=value> | 配置信息。 |
| –delete-config <String: name> | 不支持 --bootstrap-server option. |
| –disable-rack-aware | Disable rack aware replica assignment |
| –exclude-internal | 排除內部topic |
| –force | Suppress console prompts |
| –if-exists | 僅topic存在才執行,不支持 --bootstrap-server. |
| –if-not-exists | 僅topic不存在才執行,不支持 --bootstrap-server. |
| –partitions <Integer: # of partitions> | 創建或修改的partition 列表 |
| –replica-assignment | 副本與broker id賦值。 |
| –replication-factor <Integer:replication factor> | 未設置,使用cluster默認值. |
| –topic <String: topic> | topic名稱. 除了–create,其他都支持正則表達式,用雙引號包括。\用于轉義 |
| –topics-with-overrides | describe topic時,僅顯示have overridden configs |
| –unavailable-partitions | describe topic時,僅顯示leader is not available |
| –under-min-isr-partitions | describe topic時,僅顯示比指定數字小的isr。不支持 --zookeeper |
| –at-min-isr-partitions | describe topic時,僅顯示等于指定數字的isr |
| –under-replicated-partitions | describe topic時,僅顯示指定分區 |
| –version | Display Kafka version. |
| –zookeeper <String: hosts> | zookeeper列表。(廢棄). |
生產者相關
kafka-console-producer.sh
發送數據到topic
kafka-console-producer.sh --broker-list kafka1:9092 --topic topic.real.mesoptions:
| –batch-size <Integer: size> | 單個批處理中發送的消息數(default: 200) |
| –bootstrap-server | |
| –broker-list <String: broker-list> | 廢棄。 |
| –compression-codec [String:compression-codec] | 壓縮編解碼器: either ‘none’, ‘gzip’, ‘snappy’, ‘lz4’, or ‘zstd’. 默認值:gzip |
| –line-reader <String: reader_class> | 從標準輸入讀取信息的class類名稱。 (default: kafka.tools.ConsoleProducer$LineMessageReader) |
| –max-block-ms <Long: ms> | 在發送請求期間,生產者將阻止的最長時間。(default: 60000) |
| –max-memory-bytes <Long: bytes> | 內存(default: 33554432,23M) |
| –max-partition-memory-bytes <Long: bytes> | 為分區分配的緩沖區大小(default: 16384,16K) |
| –message-send-max-retries | 最大的重試發送次數(default: 3) |
| –metadata-expiry-ms <Long: > | 強制更新元數據的時間閾值(ms)(default: 300000) |
| –producer-property <String:producer_prop> | 將自定義屬性傳遞給生成器的機制。形如:key=value |
| –producer.config <String: config file> | 生產者配置屬性文件。[–producer-property]優先于此配置 |
| –property <String: prop> | 自定義消息讀取器。 |
| –request-required-acks String: | 生產者請求的確認方式。0、1(默認值)、all |
| –request-timeout-ms <Integer: ms> | 生產者請求的確認超時時間 默認值:1500 |
| –retry-backoff-ms | 生產者重試前,刷新元數據的等待時間閾值(default: 100) |
| –socket-buffer-size <Integer: size> | TCP接收緩沖大小. (default: 102400) |
| –sync | 同步發送消息 |
| –timeout <Integer: timeout_ms> | 異步發送模式,超時時間。默認值:1000 |
| –topic | |
| –version |
消費者相關
kafka-console-consumer.sh
kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic topic.real.mesoptions:
| –topic | string | 被消費的topic | |
| –whitelist | string | 正則表達式,指定要包含以供使用的主題的白名單 | |
| –partition | integer | 指定分區 除非指定’–offset’,否則從分區結束(latest)開始消費 | |
| –offset | string | 執行消費的起始offset位置 默認值:latest | latest earliest |
| –consumer-property | string | 將用戶定義的屬性以key=value的形式傳遞給使用者 | |
| –consumer.config | string | 消費者配置屬性文件 請注意,[consumer-property]優先于此配置 | |
| –formatter | string | 用于格式化kafka消息以供顯示的類的名稱 默認值:kafka.tools.DefaultMessageFormatter | kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter |
| –property | string | 初始化消息格式化程序的屬性 | print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> |
| –from-beginning | 從存在的最早消息開始,而不是從最新消息開始 | ||
| –max-messages | integer | 消費的最大數據量,若不指定,則持續消費下去 | |
| –timeout-ms | integer | 在指定時間間隔內沒有消息可用時退出 | |
| –skip-message-on-error | 如果處理消息時出錯,請跳過它而不是暫停 | ||
| –bootstrap-server | string | 必需(除非使用舊版本的消費者),要連接的服務器 | |
| –key-deserializer | string | ||
| –value-deserializer | string | ||
| –enable-systest-events | 除記錄消費的消息外,還記錄消費者的生命周期 (用于系統測試) | ||
| –isolation-level | string | 設置為read_committed以過濾掉未提交的事務性消息 設置為read_uncommitted以讀取所有消息 默認值:read_uncommitted | |
| –group | string | 指定消費者所屬組的ID | |
| –blacklist | string | 要從消費中排除的主題黑名單 | |
| –csv-reporter-enabled | 如果設置,將啟用csv metrics報告器 | ||
| –delete-consumer-offsets | 如果指定,則啟動時刪除zookeeper中的消費者信息 | ||
| –metrics-dir | string | 輸出csv度量值 需與[csv-reporter-enable]配合使用 | |
| –zookeeper | string | 必需(僅當使用舊的使用者時)連接zookeeper的字符串。 可以給出多個URL以允許故障轉移 |
kafka-consumer-groups.sh
用于查詢,維護消費組。
#顯示消費情況kafka-consumer-groups.sh --group consumer.group.realme123 --describe --bootstrap-server kafka1:9092 #設置到最晚offset kafka-consumer-groups.sh --group consumer.group.realme123 --bootstrap-server kafka1:9092 --topic topic.real.mes --reset-offsets --to-latest --execute #設置到指定offset kafka-consumer-groups.sh --group consumer.group.realme123 --bootstrap-server kafka1:9092 --topic topic.real.mes --reset-offsets --to-offset 80000000 --executeoptions:
| –all-groups | 應用所有消費組. |
| –all-topics | 一個組消費的所有topic。用于reset-offsets. |
| –bootstrap-server | |
| –by-duration <String: duration> | 設置offset(離當前時間duration的位置). Format: ‘PnDTnHnMnS’ |
| –command-config <String: config property file> | |
| –delete | 刪除指定群組中的topic partition offsets and ownership。 |
| –delete-offsets | 刪除offsets ,一次1個group,多個topic。 |
| –describe | |
| –from-file <String: path to CSV file> | Reset offsets to values defined in CSV file. |
| –group <String: consumer group> | The consumer group we wish to act on. |
| –help | Print usage information. |
| –list | List all consumer groups. |
| –members | Describe members of the group. 僅支持 ‘–describe’、 ‘–bootstrap-server’ options |
| –offsets | 描述group和topic。 僅支持 ‘–describe’、 ‘–bootstrap-server’ options |
| –state | 描述狀態。僅支持 ‘–describe’、 ‘–bootstrap-server’ options |
| –reset-offsets | 重置offset。offset支持: --to-datetime,–by-period, --to-earliest, --to-latest, --shift-by, --from-file, --to-current. 操作支持:–dry-run(默認), --execute,-- export |
| –dry-run | 僅顯示結果,不真正生效。 |
| –execute | 修改生效。 |
| –export | 導出操作 to a CSV file. Supported operations: reset-offsets. |
| –shift-by <Long: number-of-offsets> | 指定離當前的偏移量 ‘n’,可以是正負值。 |
| –to-current | 當前offset. |
| –to-datetime <String: datetime> | 指定時間。Format: ‘YYYY-MM-DDTHH:mm:SS.sss’ |
| –to-earliest | 最早. |
| –to-latest | 最晚. |
| –to-offset <Long: offset> | 指定offset. |
| –topic <String: topic> | 指定topic。reset-offsets可以指定partition格式:topic1:0,1,2。 |
| –verbose | 提供輔助信息。 |
| –timeout | <Long: timeout (ms)> (default: 5000) |
| –version |
有些操作不能在topic被消費時執行,不然會提示:
? Assignments can only be reset if the group ’ is inactive, but the current state is Stable
通用命令
kafka-run-class.sh
運行一個class,調用kafka的tools的部分功能。
kafka-run-class.sh [-daemon] [-name servicename] [-loggc] classname [opts]GetOffsetShell
kafka-run-class.sh kafka.tools.GetOffsetShell /? #獲取offset kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka1:9092 --topic topic.real.mes --time -1options:
| –broker-list | |
| –max-wait-ms <Integer: ms> | 廢棄。(default: 1000) |
| –offsets <Integer: count> | 廢棄。(default: 1) |
| –partitions <String: partition ids> | partition id列表 |
| –time <Long: > | 時間戳。返回指定時間戳之前的offset。 timestamp/-1(latest,默認值)/-2(earliest)。如果時間戳大于當前時刻,無offset返回。 |
| –topic <String: topic> |
ConsumerOffsetChecker
主要是運行kafka.tools.ConsumerOffsetChecker類,對應的腳本是kafka-consumer-offset-checker.sh,會顯示出Consumer的Group、Topic、分區ID、分區對應已經消費的Offset、logSize大小,Lag以及Owner等信息。
DumpLogSegments
驗證日志索引是否正確,或者從log文件中直接打印消息。
ExportZkOffsets
導出Zookeeper中Group相關的偏移量。
JmxTool
打印出Kafka相關的metrics信息
KafkaMigrationTool
將Kafka 0.7上面的數據遷移到Kafka 0.8。
MirrorMaker
同步兩個Kafka集群的數據
服務管理
#啟動kafka服務 kafka-server-start.sh #停止kafka服務 kafka-server-stop.sh問題
1、kafka 異常 WARN Error while fetching metadata with correlation id xxx
原因:從zookeeper獲取到的kafka的信息,需要有外部監聽,注意配置kafka的listeners。
修改config下的 server.properties 文件 將 listeners=PLAINTEXT://:9092 修改成listeners=PLAINTEXT://ip:9092不同kafka 鏡像的配置可能不相同:
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181ALLOW_PLAINTEXT_LISTENER: 'yes'KAFKA_INTER_BROKER_LISTENER_NAME: INTERNALKAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXTKAFKA_CFG_LISTENERS: INTERNAL://:9092,EXTERNAL://:${MS_KAFKA_EXT_PORT}KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL://${MS_KAFKA_EXT_HOST}:${MS_KAFKA_EXT_PORT}KAFKA_CFG_LOG_RETENTION_HOURS: 64 #或者KAFKA_ADVERTISED_HOST_NAME: 192.168.1.63KAFKA_ADVERTISED_PORT: 9192KAFKA_ZOOKEEPER_CONNECT: zk1:2181,zk2:2181,zk3:2181DELETE_TOPIC_ENBLE: "true"KAFKA_BROKER_ID: 12、徹底刪除topic
徹底刪除Kafka中的topic
1)、刪除kafka存儲目錄(server.properties文件log.dirs配置,默認為"/tmp/kafka-logs")相關topic目錄
2)、Kafka 刪除topic的命令是:
./bin/kafka-topics --delete --zookeeper 【zookeeper server】 --topic 【topic name】#如果kafaka啟動時加載的配置文件中server.properties沒有配置delete.topic.enable=true,那么此時的刪除并不是真正的刪除,而是把topic標記為:marked for deletion#查看所有topic:./bin/kafka-topics --zookeeper 【zookeeper server】 --list 此時你若想真正刪除它,可以如下操作:(1)登錄zookeeper客戶端:命令:./bin/zookeeper-client(2)找到topic所在的目錄:ls /brokers/topics(3)找到要刪除的topic,執行命令:rmr /brokers/topics/【topic name】即可,此時topic被徹底刪除。另外被標記為marked for deletion的topic你可以在zookeeper客戶端中通過命令獲得:ls /admin/delete_topics/【topic name】, 如果你刪除了此處的topic,那么marked for deletion 標記消失 zookeeper 的config中也有有關topic的信息: ls /config/topics/【topic name】暫時不知道有什么用總結:
徹底刪除topic:
1、刪除kafka存儲目錄(server.properties文件log.dirs配置,默認為"/tmp/kafka-logs")相關topic目錄
2、如果配置了delete.topic.enable=true直接通過命令刪除,如果命令刪除不掉,直接通過zookeeper-client 刪除掉broker下的topic即可。
3 、查看某個group消費topic的offset,并重置。
使用 kafka-consumer-groups.sh 命令,見前面。
4、查看kafka版本
沒有對應命令,進入kafka安裝目錄libs下。查看kafka_* 開頭的jar包,
-rw-r--r-- 1 kafka kafka 821 Feb 14 2017 kafka_2.11-0.10.2.0-test-sources.jar.asc -rw-r--r-- 1 kafka kafka 3452117 Feb 14 2017 kafka_2.11-0.10.2.0-test.jar -rw-r--r-- 1 kafka kafka 821 Feb 14 2017 kafka_2.11-0.10.2.0-test.jar.asc -rw-r--r-- 1 kafka kafka 5641281 Feb 14 2017 kafka_2.11-0.10.2.0.jar -rw-r--r-- 1 kafka kafka 821 Feb 14 2017 kafka_2.11-0.10.2.0.jar.asc2.11是 scala版本,0.10.2.0是 kafka版本。
參考
kafka的bin目錄下的其他工具
connect-distributed.sh connect-mirror-maker.sh connect-standalone.sh kafka-acls.sh kafka-broker-api-versions.sh kafka-configs.sh kafka-consumer-perf-test.sh kafka-delegation-tokens.sh kafka-delete-records.sh kafka-dump-log.sh kafka-leader-election.sh kafka-log-dirs.sh kafka-mirror-maker.sh kafka-preferred-replica-election.sh kafka-producer-perf-test.sh kafka-reassign-partitions.sh kafka-replica-verification.sh kafka-streams-application-reset.sh kafka-verifiable-consumer.sh kafka-verifiable-producer.sh trogdor.sh zookeeper-security-migration.sh zookeeper-server-start.sh zookeeper-server-stop.sh zookeeper-shell.shkafka.tools下有什么類
git:https://github.com/apache/kafka/tree/2.5
路徑:core/src/main/scala/kafka/tools。
ConsoleConsumer ConsoleProducer ConsumerPerformance DumpLogSegments EndToEndLatency GetOffsetShell JmxTool MirrorMaker PerfConfig ReplicaVerificationTool StateChangeLogMerger StreamsResetter文檔
官網:http://kafka.apache.org/documentation/#gettingStarted
中文文檔:https://kafka.apachecn.org/
總結
以上是生活随笔為你收集整理的kafka常用命令及问题解决的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: InfluxDB基本使用说明
- 下一篇: kafka技术内幕(一)