Kafka消息处理与集群维护
磁盤重認識?
當需要從磁盤讀取數據時,要確定讀的數據在哪個磁道,哪個扇區:
-
首先必須找到柱面,即磁頭需要移動對準相應磁道,這個過程叫做尋道,所耗費時間叫做尋道時間;
-
然后目標扇區旋轉到磁頭下,這個過程耗費的時間叫做旋轉時間;
一次訪盤請求(讀/寫)完成過程由三個動作組成
- 尋道(時間):磁頭移動定位到指定磁道;
- 旋轉延遲(時間):等待指定扇區從磁頭下旋轉經過;
- 數據傳輸(時間):數據在磁盤、內存與網絡之間的實際傳輸
由于存儲介質的特性,磁盤本身存取就比主存慢,再加上機械運動耗費,磁盤的存取速度往往是主存的幾百分之一甚至幾千分支一
怎么樣才能提高磁盤的讀寫效率呢?
根據數據的局部性原理 ,有以下兩種方法
- 預讀或者提前讀;
- 合并寫——多個邏輯上的寫操作合并成一個大的物理寫操作中;
即采用磁盤順序讀寫(不需要尋道時間,只需很少的旋轉時間)。實驗結果:在一個6 7200rpm SATA RAID-5 的磁盤陣列上線性寫的速度大概是300M/秒,但是隨機寫的速度只有50K/秒,兩者相差將近10000倍。
RAID說明:
??? 好處:能夠將多塊小磁盤合并為一塊大磁盤,并且RAID-5允許一塊磁盤掛掉,RAID-6允許2塊磁盤掛掉
??? 弊端:多塊小磁盤的同時并發讀寫的效率遠大于RAID單塊大磁盤的讀寫效率
Kafka消息的寫入原理
一般的將數據從文件傳到套接字的路徑:
- 操作系統將數據從磁盤讀到內核空間的頁緩存中;
- 應用將數據從內核空間讀到用戶空間的緩存中;
- 應用將數據寫回內存空間的套接字緩存中
- 操作系統將數據從套接字緩存寫到網卡緩存中,以便將數據經網絡發出;
這樣做明顯是低效的,這里有四次拷貝,兩次系統調用。如果使用sendfile(Java 為: FileChannel.transferTo ??api),兩次拷貝可以被避免:允許操作系統將數據直接從頁緩存發送到網絡上。優化后,只有最后一步將數據拷貝到網卡緩存中是需要的
Kafka
topic信息
[root@localhost zhangwx]# kafka-topics.sh --describe --zookeeper localhost:12181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
說明:
PartitionCount:1 //表示分區數量為1 ReplicationFactor:3 //表示每個分區的副本數為3 Configs:Topic: my-replicated-topic Partition: 0 //表示一個分區的分區編號為0 Leader: 0 //表示分區leader為broker0級 即第一臺機器 Replicas: 2,0,1 //表示副本所在的機器為broker2,broker0,broker1三臺機器上 Isr: 0,1,2 //處于同步中的broker,假設leader為broker0掛掉了,那么可以從isr中選舉出其它機器作為leader Kafka消息刪除原理
從最久的日志段開始刪除(按日志段為單位進行刪除),然后逐步向前推進,直到某個日志段不滿足條件為止,刪除條件(日志段表示一個個的.log日志文件)
- 滿足給定條件predicate(配置項log.retention.{ms,minutes,hours}和log.retention.bytes指定);
- 不能是當前激活日志段;(激活日志段表示正在操作的日志)
- 大小不能小于日志段的最小大小(配置項log.segment.bytes配置)
- 要刪除的是否是所有日志段,如果是的話直接調用roll方法進行切分,因為Kafka至少要保留一個日志段;
Kafka消息檢索原理
Kafka消息segment
file組成和物理結構
以讀取offset=368776的message為例,需要通過下面2個步驟查找:
- 第一步查找segment file;
以上圖為例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1。只要根據offset二分查找文件列表,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index|log
- 第二步通過segment file查找message;
算出368776-368770=6,取00000000000000368769.index文件第三項(6,1407),得出從00000000000000368769.log文件頭偏移1407字節讀取一條消息即可
Kafka集群維護
集群信息實時查看(topic工具):
列出集群當前所有可用的topic:
bin/kafka-topics.sh –list –zookeeper?? zookeeper_address
查看集群特定topic 信息:
bin/kafka-topics.sh –describe –zookeeper zookeeper_address
?–topic topic_name
集群信息實時修改(topic工具):
創建topic:
bin/kafka-topics.sh –create –zookeeper zookeeper_address??? –replication-factor 1 –partitions 1 –topic topic_name
增加(不能減少) partition(最后的4是增加后的值):
(注:一般來說當數據量逐漸增多時,我們需要提高該topic的并發度,就可以通過增加partition的數量來提高)
bin/kafka-topics.sh –zookeeper zookeeper_address??? –alter –topic topic_name???? –partitions 4
Topic-level configuration 配置都能修改
Kafka集群leader平衡機制
每個partitiion的所有replicas叫做“assigned replicas”,“assigned replicas”中的第一個replicas叫“preferred replica”,剛創建的topic一般“preferred replica”是leader。下圖中Partition 0的broker? 2就是preferred replica”,默認會成為該分區的leader。
集群leader平衡:
bin/kafka-preferred-replica-election.sh –zookeeper zookeeper_address
auto.leader.rebalance.enable=true
集群分區日志遷移:
遷移topic數據到其他broker,請遵循下面四步:
寫json文件,文件格式如下:
cat topics-to-move.json{"topics": [{"topic": "foo1"},{"topic": "foo2"}],"version":1}
使用–generate生成遷移計劃(下面的操作是將topic: foo1和foo2移動到broker 5,6):
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181
–topics-to-move-json-file topics-to-move.json –broker-list “5,6” –generate
這一步只是生成計劃,并沒有執行數據遷移;
使用–execute執行計劃:
bin/kafka-reassign-partitions.sh –zookeeper localhost:2181
–reassignment-json-file expand-cluster-reassignment.json –execute
執行前最好保存當前的分配情況,以防出錯回滾使用–verify驗證是否已經遷移完成
遷移某個topic的某些特定的partition數據到其他broker,步驟與上面一樣,但是json文件如下面所示:
cat custom-reassignment.json
{“version”:1,”partitions”:[{“topic”:”foo1″,”partition”:0,”replicas”:[5,6]},{“topic”:”foo2″,”partition”:1,”replicas”:[2,3]}]}
可以指定到topic的分區編號
kafka-reassign-partitions.sh工具會復制磁盤上的日志文件,只有當完全復制完成,才會刪除遷移前磁盤上的日志文件。執行分區日志遷移需要注意:
kafka-reassign-partitions.sh 工具的粒度只能到broker,不能到broker的目錄(如果broker上面配置了多個目錄,是按照磁盤上面已駐留的分區數來均勻分配的),所以,如果topic之間的數據,或者topic的partition之間的數據本身就不均勻,很有可能造成磁盤數據的不均勻:
對于分區數據較多的分區遷移數據會花大量的時間,所以建議在topic數據量較少或磁盤有效數據較少的情況下執行數據遷移操作;
進行分區遷移時最好先保留一個分區在原來的磁盤,這樣不會影響正常的消費和生產,如果目的是將分區5(brober1,5)遷移到borker2,3??梢韵葘?遷移到2,1,最后再遷移到2,3。而不是一次將1,5遷移到2,3。因為一次遷移所有的副本,無法正常消費和生產,部分遷移則可以正常消費和生產
Kafka集群監控
Kafka
Offset Minotor介紹
(只能監控單一集群,且不能進行相關操作,只能查看集群基本信息,安裝過程需要翻墻,安裝好后通過機器:端口即可訪問)
在生產環境需要集群高可用,所以需要對Kafka集群進行監控。Kafka Offset Monitor可以監控Kafka集群以下幾項:
- Kafka集群當前存活的broker集合;
- Kafka集群當前活動topic集合;
- 消費者組列表
- Kafka集群當前consumer按組消費的offset lag數(即當前topic當前分區目前有多少消息積壓而沒有及時消費)
部署Kafka Offset Minotor:
github下載jar包KafkaOffsetMonitor-assembly-0.2.0.jar :
https://github.com/quantifind/KafkaOffsetMonitor/releases
啟動Kafka Offset Minotor :
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb –zk zk-01,zk-02 –refresh 5.minutes –retain 1.day &
Kafka Manager使用
(相對于Kafka Offset Minotor功能更強大,可以監控多集群且可以進行相關寫操作)
Kafka Manager由雅虎開源,提供以下功能:
- 管理幾個不同的集群;
- 容易地檢查集群的狀態(topics, brokers, 副本的分布, 分區的分布) ;
- 選擇副本
- 基于集群的當前狀態產生分區分配
- 重新分配分區
Kafka Manager的安裝,方法一(不但要求能上網,還要求能翻墻):
安裝sbt:
http://www.scala-sbt.org/download.html
下載后,解壓并配置環境變量(將SBT_HOME/bin配置到PATH變量中)
安裝Kafka Manager :
git clone?https://github.com/yahoo/kafka-manager
cd kafka-manager
sbt clean dist
部署Kafka Manager
修改conf/application.conf,把kafka-manager.zkhosts改為自己的zookeeper服務器地址
bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8007 &
Kafka Manager的安裝,方法二:
下載打包好的Kafka manager:
https://github.com/scootli/kafka-manager-1.0-SNAPSHOT/tree/master/kafka-manager-1.0-SNAPSHOT
下載后解壓
修改conf/application.conf,把Kafka-manager.zkhosts改為自己的zookeeper服務器地址
bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port=8007?&
總結
以上是生活随笔為你收集整理的Kafka消息处理与集群维护的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 办理营业执照需要什么材料
- 下一篇: J2EE 企业级应用架构简述