Kafka(六)Kafka基本客户端命令操作
轉載自:https://blog.51cto.com/littledevil/2147950
主題管理
創建主題
如果配置了auto.create.topics.enable=true(這也是默認值)這樣當生產者向一個沒有創建的主題發送消息就會自動創建,其分區數量和副本數量也是有默認配置來控制的。
# 我們這里創建一個3個分區每個分區有2個副本的主題 kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest| --create | 表示建立 |
| --zookeeper | 表示ZK地址,可以傳遞多個,用逗號分隔 --zookeeper IP:PORT,IP:PORT,IP:PORT/kafka |
| --replication-factor | 表示副本數量,這里的數量是包含Leader副本和Follower副本,副本數量不能超過代理數量 |
| --partitions | 表示主題的分區數量,必須傳遞該參數。Kafka的生產者和消費者采用多線程并行對主題的消息進行處理,每個線程處理一個分區,分區越多吞吐量就會越大,但是分區越多也意味著需要打開更多的文件句柄數量,這樣也會帶來一些開銷。 |
| --topic | 表示主題名稱 |
在Zookeeper中可以看到如下信息
刪除主題
刪除有兩種方式手動和自動
-
手動方式需要刪除各個節點日志路徑下的該主題所有分區,并且刪除zookeeper上/brokers/topics和/config/topics下的對應主題節點
-
自動刪除就是通過腳本來完成,同時需要配置服務器配置文件中的delete.topic.enable=true,默認為false也就是說通過命令刪除主題只會刪除ZK中的節點,日志文件不會刪除需要手動清理,如果配置為true,則會自動刪除日志文件。
下面的兩句話就是說該主題標記為刪除/admin/delete_topics節點下。實際數據沒有影響因為該參數沒有設置為true。
查看主題
# 列出所有主題 kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka下面是從ZK中看到的所有主題
# 查看所有主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka # 查看特定主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBBReplicas:是AR列表,表示副本分布在哪些代理上,且該列表第一個元素就是Leader副本所在代理
ISR:該列表是顯示已經同步的副本集合,這個列表的副本都是存活的
# 通過--describe 和 --under-replicated-partitions 可以查看正在同步的主題或者同步可能發生異常, # 也就是ISR列表長度小于AR列表,如果一切正常則不會返回任何東西,也可以通過 --tipic 指定具體主題 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions # 查看哪些主題建立時使用了單獨的配置 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides這里只有一個內部主題__comsumer_offsets使用了非配置文件中的設置
?
配置管理
所謂配置就是參數,比如修改主題的默認參數。
主題級別的
# 查看配置 kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB這里顯示 Configs for topic 'BBB' are 表示它的配置有哪些,這里沒有表示沒有為該主題單獨設置配置,都是使用的默認配置。
# 增加一個配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2如果修改的話還是相同的命令,只是把值修改一下
# 刪除配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages客戶端級別
這個主要是設置流控
# 設置指定消費者的流控 --entity-name 是客戶端在創建生產者或者消費者時是指定的client.id名稱 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME下圖為ZK中對應的信息
?
分區管理
分區平衡
Leader副本在集群中應該是均衡分布,因為Leader副本對外提供讀寫服務,盡可能不讓同一個主題的多個Leader副本在同一個代理上,但是隨著時間推移比如故障轉移等情況發送,Leader副本可能不均衡。有兩種方式設置自動平衡,自動和手動。
自動就是在配置文件中增加?auto.leader.rebalance.enable?=?true?如果該項為false,當某個節點故障恢復并重新上線后,它原來的Leader副本也不會轉移回來,只是一個Follower副本。
手動就是通過命令來執行
kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka分區遷移
當下線一個節點需要將該節點上的分區副本遷移到其他可用節點上,Kafka并不會自動進行分區遷移,如果不遷移就會導致某些主題數據丟失和不可用的情況。當增加新節點時,只有新創建的主題才會分配到新節點上,之前的主題分區不會自動分配到新節點上,因為老的分區在創建時AR列表中沒有這個新節點。
上面2個主題,每個主題3個分區,每個分區3個副本,我們假設現在代理2要下線,所以我們要把代理2上的這兩個主題的分區數據遷移出來。
# 1. 在KAFKA目錄的config目錄中建立topics-to-move.json文件 {"topics":[{"topic":"AAA"},{"topic":"BBB"}],"version":1 }?
# 2. 生成分區分配方案,只是生成一個方案信息然后輸出 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate這個命令的原理是從zookeeper中讀取主題元數據信息及制定的有效代理,根據分區副本分配算法重新計算指定主題的分區副本分配方案。把【Proposed partition reassignment configuration】下面的分區方案保存到一個JSON文件中,partitions-reassignment.json 文件名無所謂。
# 3. 執行方案 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute # 4. 查看進度 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify查看結果,這里已經沒有代理0了。
集群擴容
上面演示了節點下線的數據遷移,這里演示一下集群擴容的數據遷移。我們還是用上面兩個主題,假設代理0又重新上線了。其實擴容就是上面的反向操作
# 1. 建立JSON文件 # 該文件和之前的相同?
# 2. 生成方案并保存到一個JSON文件中 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate # 3. 數據遷移,這里通過--throttle做一個限流操作,如果數據過大會把網絡堵塞。 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024查看進度和結果
增加分區
通常在需要提供吞吐量的時候我們會增加分區,然后如果代理數量不擴大,同時生產者和消費者線程不增大,你擴展了分區也沒有用。
kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03增加副本
集群規模擴大并且想對所有主題或者指定主題提高可用性,那么可以增加原有主題的副本數量
上面是3個分區,每個分區1個副本,我們現在把每個分區擴展為3個副本
# 1. 創建JSON文件 replica-extends.json {"version": 1,"partitions": [{"topic": "KafkaTest04","partition": 0,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 1,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 2,"replicas": [0,1,2]}] } # 2. 執行分區副本重新分配命令 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute查看狀態
查看結果
?
鏡像操作
Kafka有一個鏡像工具kafka-mirror-maker.sh,用于將一個集群數據同步到另外一個集群中,這個非常有用,比如機房搬遷就需要進行數據同步。該工具的本質就是創建一個消費者,在源集群中需要遷移的主題消費數據,然后創建一個生產者,將消費的數據寫入到目標集群中。
首先創建消費者配置文件mirror-consumer.properties(文件路徑和名稱是自定義的)
# 源kafka集群代理地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092 # 消費者組名 group.id=mirror其次創建生產者配置文件mirror-producer.properties(文件路徑和名稱是自定義的)
# 目標kafka集群地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092運行鏡像命令
# 通過 --whitelist 指定需要鏡像的主題,通過 --blacklist 指定不需要鏡像的主題 kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC由于鏡像操作是啟動一個生產者和消費者,所以數據同步完成后這個生產者和消費者并不會關閉,它會依然等待新數據,所以同步完成以后你需要自己查看,確認完成了則關閉生產者和消費者。
總結
以上是生活随笔為你收集整理的Kafka(六)Kafka基本客户端命令操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink ProcessFuncti
- 下一篇: leveldb使用指南