日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka(六)Kafka基本客户端命令操作

發布時間:2024/8/23 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka(六)Kafka基本客户端命令操作 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

轉載自:https://blog.51cto.com/littledevil/2147950

主題管理

創建主題

如果配置了auto.create.topics.enable=true(這也是默認值)這樣當生產者向一個沒有創建的主題發送消息就會自動創建,其分區數量和副本數量也是有默認配置來控制的。

# 我們這里創建一個3個分區每個分區有2個副本的主題 kafka-topics.sh --create --zookeeper 172.16.48.171:2181/kafka --replication-factor 2 --partitions 3 --topic KafkaTest
--create表示建立
--zookeeper

表示ZK地址,可以傳遞多個,用逗號分隔

--zookeeper IP:PORT,IP:PORT,IP:PORT/kafka

--replication-factor表示副本數量,這里的數量是包含Leader副本和Follower副本,副本數量不能超過代理數量
--partitions表示主題的分區數量,必須傳遞該參數。Kafka的生產者和消費者采用多線程并行對主題的消息進行處理,每個線程處理一個分區,分區越多吞吐量就會越大,但是分區越多也意味著需要打開更多的文件句柄數量,這樣也會帶來一些開銷。
--topic表示主題名稱

在Zookeeper中可以看到如下信息

刪除主題

刪除有兩種方式手動和自動

  • 手動方式需要刪除各個節點日志路徑下的該主題所有分區,并且刪除zookeeper上/brokers/topics和/config/topics下的對應主題節點

  • 自動刪除就是通過腳本來完成,同時需要配置服務器配置文件中的delete.topic.enable=true,默認為false也就是說通過命令刪除主題只會刪除ZK中的節點,日志文件不會刪除需要手動清理,如果配置為true,則會自動刪除日志文件。

kafka-topics.sh --delete --zookeeper 172.16.48.171:2181/kafka --topic KafkaTest

下面的兩句話就是說該主題標記為刪除/admin/delete_topics節點下。實際數據沒有影響因為該參數沒有設置為true。

查看主題

# 列出所有主題 kafka-topics.sh --list --zookeeper 172.16.48.171:2181/kafka

下面是從ZK中看到的所有主題

# 查看所有主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka

# 查看特定主題信息 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topic BBB

Replicas:是AR列表,表示副本分布在哪些代理上,且該列表第一個元素就是Leader副本所在代理

ISR:該列表是顯示已經同步的副本集合,這個列表的副本都是存活的

# 通過--describe 和 --under-replicated-partitions 可以查看正在同步的主題或者同步可能發生異常, # 也就是ISR列表長度小于AR列表,如果一切正常則不會返回任何東西,也可以通過 --tipic 指定具體主題 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --under-replicated-partitions # 查看哪些主題建立時使用了單獨的配置 kafka-topics.sh --describe --zookeeper 172.16.48.171:2181/kafka --topics-with-overrides

這里只有一個內部主題__comsumer_offsets使用了非配置文件中的設置

?

配置管理

所謂配置就是參數,比如修改主題的默認參數。

主題級別的

# 查看配置 kafka-configs.sh --describe --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BB

這里顯示 Configs for topic 'BBB' are 表示它的配置有哪些,這里沒有表示沒有為該主題單獨設置配置,都是使用的默認配置。

# 增加一個配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --add-config flush.messages=2

如果修改的話還是相同的命令,只是把值修改一下

# 刪除配置 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --entity-type topics --entity-name BBB --alter --delete-config flush.messages

客戶端級別

這個主要是設置流控

# 設置指定消費者的流控 --entity-name 是客戶端在創建生產者或者消費者時是指定的client.id名稱 kafka-configs.sh --zookeeper 172.16.48.171:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name COMSUMER_NAME

下圖為ZK中對應的信息

?

分區管理

分區平衡

Leader副本在集群中應該是均衡分布,因為Leader副本對外提供讀寫服務,盡可能不讓同一個主題的多個Leader副本在同一個代理上,但是隨著時間推移比如故障轉移等情況發送,Leader副本可能不均衡。有兩種方式設置自動平衡,自動和手動。

自動就是在配置文件中增加?auto.leader.rebalance.enable?=?true?如果該項為false,當某個節點故障恢復并重新上線后,它原來的Leader副本也不會轉移回來,只是一個Follower副本。

手動就是通過命令來執行

kafka-preferred-replica-election.sh --zookeeper 172.16.48.171:2181/kafka

分區遷移

當下線一個節點需要將該節點上的分區副本遷移到其他可用節點上,Kafka并不會自動進行分區遷移,如果不遷移就會導致某些主題數據丟失和不可用的情況。當增加新節點時,只有新創建的主題才會分配到新節點上,之前的主題分區不會自動分配到新節點上,因為老的分區在創建時AR列表中沒有這個新節點。

上面2個主題,每個主題3個分區,每個分區3個副本,我們假設現在代理2要下線,所以我們要把代理2上的這兩個主題的分區數據遷移出來。

# 1. 在KAFKA目錄的config目錄中建立topics-to-move.json文件 {"topics":[{"topic":"AAA"},{"topic":"BBB"}],"version":1 }

?

# 2. 生成分區分配方案,只是生成一個方案信息然后輸出 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "1,2" --generate

這個命令的原理是從zookeeper中讀取主題元數據信息及制定的有效代理,根據分區副本分配算法重新計算指定主題的分區副本分配方案。把【Proposed partition reassignment configuration】下面的分區方案保存到一個JSON文件中,partitions-reassignment.json 文件名無所謂。

# 3. 執行方案 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute

# 4. 查看進度 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --verify

查看結果,這里已經沒有代理0了。

集群擴容

上面演示了節點下線的數據遷移,這里演示一下集群擴容的數據遷移。我們還是用上面兩個主題,假設代理0又重新上線了。其實擴容就是上面的反向操作

# 1. 建立JSON文件 # 該文件和之前的相同

?

# 2. 生成方案并保存到一個JSON文件中 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --topics-to-move-json-file ./topics-to-move.json --broker-list "0,1,2" --generate

# 3. 數據遷移,這里通過--throttle做一個限流操作,如果數據過大會把網絡堵塞。 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./partitions-reassignment.json --execute --throttle 1024

查看進度和結果

增加分區

通常在需要提供吞吐量的時候我們會增加分區,然后如果代理數量不擴大,同時生產者和消費者線程不增大,你擴展了分區也沒有用。

kafka-topics.sh --alter --zookeeper 172.16.48.171:2181/kafka --partitions 3 --topic KafkaTest03

增加副本

集群規模擴大并且想對所有主題或者指定主題提高可用性,那么可以增加原有主題的副本數量

上面是3個分區,每個分區1個副本,我們現在把每個分區擴展為3個副本

# 1. 創建JSON文件 replica-extends.json {"version": 1,"partitions": [{"topic": "KafkaTest04","partition": 0,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 1,"replicas": [0,1,2]},{"topic": "KafkaTest04","partition": 2,"replicas": [0,1,2]}] } # 2. 執行分區副本重新分配命令 kafka-reassign-partitions.sh --zookeeper 172.16.48.171:2181/kafka --reassignment-json-file ./replica-extends.json --execute

查看狀態

查看結果

?

鏡像操作

Kafka有一個鏡像工具kafka-mirror-maker.sh,用于將一個集群數據同步到另外一個集群中,這個非常有用,比如機房搬遷就需要進行數據同步。該工具的本質就是創建一個消費者,在源集群中需要遷移的主題消費數據,然后創建一個生產者,將消費的數據寫入到目標集群中。

首先創建消費者配置文件mirror-consumer.properties(文件路徑和名稱是自定義的)

# 源kafka集群代理地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092 # 消費者組名 group.id=mirror

其次創建生產者配置文件mirror-producer.properties(文件路徑和名稱是自定義的)

# 目標kafka集群地址列表 bootstrap.servers=IP1:9092,IP2:9092,IP3:9092

運行鏡像命令

# 通過 --whitelist 指定需要鏡像的主題,通過 --blacklist 指定不需要鏡像的主題 kafka-mirror-maker.sh --consumer.config PATH/mirror-consumer.properties --producer.config PATH/mirror-producer.properties --whitelist TOPIC

由于鏡像操作是啟動一個生產者和消費者,所以數據同步完成后這個生產者和消費者并不會關閉,它會依然等待新數據,所以同步完成以后你需要自己查看,確認完成了則關閉生產者和消費者。

總結

以上是生活随笔為你收集整理的Kafka(六)Kafka基本客户端命令操作的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。