学习Kafka
?
Kafka
- ?理解消息隊列
- ?認識kafka
- ?kafka核心概念
- ?kafka結構
- ?安裝啟動kafka
- ?使用kafka
消息隊列
知識要點
- 背景、問題的產生
- 消息隊列應運而生
- 消息隊列的特點
2.1 背景、問題的產生
傳統單體應用逐漸被SOA架構、微服務體系架構所替代,如此一來系統數目爆炸級增長,原來在一個系統之間的數據交互演變成跨系統、跨區域。
如何來解決數據信息的傳輸呢?使用Rest Api進行HTTP通信? 還是采用Web Service ?
- HTTP Rest?通信
- 傳輸雙方需要維護各自的接口url,需要編寫大量的對接服務
- 如果一方存在變更,另一方不得不被動的改變迭代上線,影響生產的穩定性
- 除了同步接口方式,不得不為了保證數據的完整性,采取一套異步機制
- 如此一來,雙方在主要業務流程之外,還需要對數據的傳輸花費大量的人力來維護,本末倒置
- Web Service?傳輸
- Web Service 僅僅將HTTP的方式包裝了一個服務注冊地,使得調用雙方無需關注真實地址
- 但是并沒有從根本上將調用雙方解耦,業務還是受到雙方服務調用的限制
2.2 消息隊列應運而生
為了應對數據在錯綜復雜的大型項目中傳輸困難問題,消息隊列應運而生。
==消息隊列(Message Queue)可以理解是一個容器,用于存放數據==。生產者可以將數據傳輸到消息隊列中,消費者再從消息隊列中獲取需要的數據信息,這樣,生產者、消費者互相解耦。
- 原來的交互
轉存失敗重新上傳取消
- 使用消息隊列后
轉存失敗重新上傳取消轉存失敗重新上傳取消
使用消息隊列后,交互雙方進行了解耦;并且減少了交互次數。
因為消費者大都數時候僅僅需要關注數據本身,所以其設計往往具有高吞吐性能,這使得我們在進行一些特定場景比如電商大促、秒殺場景的時候就可以采用消息隊列進行“削峰填谷”。
2.3 消息隊列的特點
消息隊列需要支持服務雙方調用解耦、應對高并發場景,一般具有以下特點:
- 數據持久化存儲
- 未免數據丟失,消息隊列應該可以提供配置方式選擇將數據進行持久化
- 讀寫快
- 消息隊列需要支撐“削峰填谷”,應對海量數據交互,要求設計合理,內部讀寫數據快
- 一般采用==“零拷貝技術”==、==mmap==技術等:可以參見?RocketMQ?技術實現要點。
- 集群部署
- 要實現高可用,MQ應該支持集群部署,能實現故障轉移
- 水平擴展
- 應該支持水平擴展,通過增加服務器提高集群的可用性、性能
- [支持事務消息]
- 可選的特性:如 RocketMQ 實現了該特性。
認識kafka
知識要點
- 認識kafka
- kafka的應用場景
2.1 認識kafka
kafka是一個消息隊列產品,擁有高吞吐、水平擴展的特性,但是對于業務性數據支持不強,一般使用它做日志消息處理平臺使用。
kafka用于構建實時的數據管道或者應用。可以水平擴展、具有容錯機制、非常快速,并且已經在數以千計的公司生產環境中使用了。
kafka由Scala和Java編寫。Kafka是一種高吞吐量的分布式發布訂閱消息系統。
2.2 kafka的應用場景
kafka可以處理消費者在網站中的所有動作流數據。 這些數據通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop一樣的日志數據和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的并行加載機制來統一線上和離線的消息處理,也是為了通過集群來提供實時的消息。
誰在使用kafka:
轉存失敗重新上傳取消轉存失敗重新上傳取消
kafka核心概念
知識要點
- kafka相關概念
- 深入理解kafka核心概念
2.1 kafka相關概念
-
Topic
- kafka存儲數據記錄的結構單元
-
數據記錄
- 每一個數據記錄由一個 key、value、timestamp 組成
-
Producer?消費者
- 可以發布流式記錄到一個或者多個kafka topic 的應用
-
Consumer?消費者
- 訂閱一個或者多個topic的應用并且可以處理這些流式記錄數據
-
Stream?流
- 扮演流式處理器,可以從一個或者多個topic中消費一個輸入流,并且產生一個輸出流到一個或者多個topic,可以將輸入流轉換為輸出流
-
Connector?連接器
- 允許構建、運行一個可重用的生產者或者消費者,用以連接到kafka的topic。例如一個關系型數據庫的連接器可以捕獲一個表的變更情況。
- 可以關注RocketMQ Connector生態項目。
2.2 深入理解kafka核心概念
kafka的一些核心概念需要我們重點掌握。
- Topics 和Logs
- 分布式 Distribution
- 地理復制 Geo-Replication
- 生產者 Producer
- 消費者 Consumer
- 多租戶 Multi-tenancy
- 可靠性的保證 Guarantees
2.2.1 Topics 和Logs
Topic 是kafka中記錄發布數據消息的類別或者名稱;
Topic可以存在多個訂閱者;
對于每一個Topic,kafka集群會維護一個分區的日志log:
轉存失敗重新上傳取消
每一個分區都是有序的,不可變的序列,他會被連續的追加到——結構化的commit log中。
分區中的每一個記錄都會被分配一個序列id編號——offset,在每個分區中是唯一的。
kafka集群會持久化保存所有發布的消息記錄,不論是否被消費過都會持久化存儲——可以配置滯留時間(retention)。例如,如果滯留策略設置為2天,2天內可以存儲并被消費掉,超過這個時間將會被回收丟棄,釋放空間。對于數據的大小而言,kafka的性能實際上是穩定的,所以存儲數據很長時間都不是問題,為什么?因為kafka是順序寫數據,分區會記錄offset,所以存儲很多數據和少量數據差異不大(當然不要太大了哦)。
轉存失敗重新上傳取消轉存失敗重新上傳取消
實際上,每一個消費者保留的元數據是該消費者在日志中的偏移量或者位置。
該偏移量由消費者控制:通常消費者在讀取記錄時會線性地推進偏移量,但事實上,由于該位置由消費者控制,所以它可以按照自己喜歡的任何順序來消費記錄。例如,一個消費者可以重置一個舊的順序offset去處理舊數據或者跳到前面去消費更“及時”的數據。
這個特性意味著kafka中的消費者非常的“隨意”——他們可以來去自如而不會對集群或者其他的消費者影響。例如,你可以使用命令行工具“tail”到任意的topic的尾部而不需要變更那些已經被消費者消費國過的topic。
分區在log服務中有幾個作用。第一個是,允許log被擴展到適合單個服務器的大小,每一個獨立的的分區必須符合托管它的server服務器,但是一個topic會有多個分區,所以它可以處理任意數量的數據。第二個是,它們可以作為并行機制的單元——或多或少是這樣。
2.2.2 分布式 Distribution
日志的分區被分布式地存儲在kafka集群中,每一個server處理數據和分區共享請求。每一個分區都會通過一個可配置的server編號被復制,用于容錯處理。
每一個分區都有一個server,扮演“leader”的角色,0個或者多個server扮演“follers”角色。
leader 負責處理該分區的所有的讀、寫請求;
followers 被動的復制leader的數據。
如果leader掛了,followers中的其中一個會自動成為新的leader。
每個server都是它的分區的leader或者是其他分區的follower,所以這樣可以保持集群的負載均衡。
2.2.3 地理復制 Geo-Replication
kafka 鏡像制造者提供了集群的地理復制功能。通過MirrorMaker,消息可以跨多個數據中心或云區域復制。
你可以在 active/passive 場景中備份或者恢復時使用這個功能。
2.2.4 生產者 Producer
生產者是發布消息數據到topic的角色。
生產者負責選擇將數據記錄分配到topic的哪一個分區。
可以通過簡單的負載均衡算法如輪詢或者其它方式來實現。
2.2.5 消費者 Consumer
消費者需要通過一個消費者組名(Consumer group)來標志它們,每一個被發布到topic的數據記錄都會被被訂閱的消費組中的一個消費者消者消費。消費者實例可以在不同的進程中或者不同的機器中。
如果所有的消費者實例都有一個相同的消費者組名,數據記錄將會通過負載均衡機制到消費者實例中被消費。
如果所有的消費者實例都是不同的消費者組名,沒一個數據記錄都會被廣播到其他所有的消費者實例中去。
轉存失敗重新上傳取消轉存失敗重新上傳取消
1個2個server的kafka集群主機,且有4各分區,2個消費者組。消費者組A有2個消費者,組B有4個消費者實例。
大都數情況下,topic一般只會有少部分消費者組,每一個都叫做“邏輯訂閱者”。
每個組都由很多消費者實例組成,可擴展以及容錯。
這也是發布-訂閱的語義,只不過訂閱的是一個消費者集群而不是單個的進程。
kafka中消費方式的實現是在消費實例上劃分日志中的分區,以便每個實例在任何時間點都是分區“公平份額”的唯一消費方。這個消費者組的關系的過程是由kafka協議動態維護的。如果新的消費者實例加入到組中,就會從其他的消費者那里接管一些分區;如果實例掛了,它的分區將會分發給剩下的實例。
kafka只提供了分區內的順序有效記錄,而不是在topic的多個分區中有序。
對于大多數應用來說,針對分區內排序或者按key劃分數據的能力就夠用了。但是,如果你想讓一個topic全部有序,那需要將topic全放在一個分區里面,這意味著在消費組中僅僅有一個消費者去處理該topic。
2.2.6 多租戶 Multi-tenancy
kafka可以部署為多租戶的解決方案,是通過決定讓哪些topic可以生產、消費數據來配置的。
同樣也支持配額限制,管理員可以在請求上定義和實施配額,以控制客戶端使用的代理資源。
2.2.7 可靠性的保證 Guarantees
一個高可用的kafka提供了以下得保證:
- 生產者發送消息到特定的topic分區是順序追加的。
- 如果記錄 M1、M2先后被同一個生產者發送,那么 M1 的offset 會比 M2 的小,且先于 M2 在log中出現。
- 一個消費者實例也是按順序查看log中存儲的消息記錄的
- 對于一個復制因子(replication factor)為N的topic,我們可以容忍 N-1 個server掛掉,而不會丟失任何commited log中的數據。
kafka結構
知識要點
- kafka內部原理
- kafka結構圖
- kafka 集群示意圖
2.1 kafka消息模式
kafka支持兩種消息模式:
- p2p模式(隊列模式 Queuing)
- 發布訂閱模式(publish-subscribe)
2.1.1 點對點p2p模式(隊列模式 Queuing):
在點對點模式中,一群消費者會從一個server中讀取數據,每一個數據會被消費者中的一個給處理。
該模式允許你將數據的處理分發到多個消費者實例中,這可以讓你擴展你的消費過程。
但是,在該模式下如果一旦一個消費者處理過后,數據就丟失了。
轉存失敗重新上傳取消正在上傳…重新上傳取消
2.1.2 發布訂閱模式(publish-subscribe):
在發布訂閱模式中,數據記錄會被廣播到所有的消費者中。
也因而缺失了處理過程的可擴展性,因為每個消息都會到每個訂閱者中。
轉存失敗重新上傳取消正在上傳…重新上傳取消
但是 kafka 的消費者組概念涵蓋了這兩個模式。
對于一個queue,消費者組允許你將處理劃分為多個進程(消費者組的成員)。
對于publish-subscribe,kafka 允許你廣播消息到多個消息組。
kafka模型的優點使得每一個topic都擁有這兩種特性——可以擴展處理進程、也可以有多個訂閱者——也就沒必要去選擇其中一個。
一個傳統的queue在server上順序保存數據記錄,如果多個消費者從隊列中消費數據,則server會按存儲的順序分發記錄。盡管server是按順序分發數據的,但是數據記錄是異步地到達消費者中的,所以有可能不是按原來的順序到達不同的消費者中的。這意味著在并行消費的過程中,失去了原有的順序表現形式。消息系統通常使用“獨占消費者”的概念來解決這個問題,只允許一個進程從queue中進行消費,但是這意味著失去了并行處理的功效。
kafka在這一點做的比較好,通過一個并行概念——==分區partition==——在topic中來實現。
kafka提供了一群消費者進程處理的順序保證、負載均衡,這是通過在topic中分配分區給消費組中的消費者來做到的,每個分區partition都只會被消費組中的一個消費者給消費。這樣做就能確保消費者是按分區的存儲順序進行消費的。盡管有很多分區,這樣還是能保持很多消費者實例的負載均衡。==但是請注意,在消費者組中不能存在比分區還多的消費者實例,多余的消費者實例將不會參與消費處理==。
2.2 kafka結構圖
轉存失敗重新上傳取消轉存失敗重新上傳取消
2.3 kafka 集群示意圖
轉存失敗重新上傳取消轉存失敗重新上傳取消
安裝啟動kafka
知識要點
- 下載安裝kafka
- 啟動kafka
- 啟動
2.1 下載安裝kafka
kafka控制腳本在基于Unix的系統和windows平臺中是不同的,在windows中是在bin\windows目錄下而不是bin/目錄,并且腳本擴展名為.bat。
先下載kafka,在官網推薦的mirror下載地址下載 :?https://www.apache.org/dyn/closer.cgi?path=/kafka/2.3.0/kafka_2.12-2.3.0.tgz,然后解壓。
解壓后的目錄:
轉存失敗重新上傳取消
2.2 啟動kafka
Kafka 使用Zookeeper,所以你首先需要啟動Zookeeepr(后文簡稱ZK)服務(沒有使用過Zookeeper的朋友可以點擊這里參考入門教程)。
也可以使用kafka中打包好的,一個單節點Zookeeper實例 來使用。
類unix平臺使用已經打包好的Zk:
> bin/zookeeper-server-start.sh config/zookeeper.propertieswindows平臺:
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties這樣就啟動了一個單機的ZK實例。
現在可以啟動kafka服務了。
類unix平臺:
> bin/kafka-server-start.sh config/server.propertieswindows平臺:
.\bin\windows\kafka-server-start.bat .\config\server.properties使用kafka
知識要點
- 創建topic
- 生產者發行消息
- 消費者消費消息
- 集群部署
2.1 使用kafka
開始使用kafka,以下命令均在kafka根目錄下執行。
2.1.1 創建一個topic
- 創建 topic
我們使用單個分區、一個副本來創建一個叫做"test" 的topic。
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test或者在windows平臺使用:
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test- 查看 topic
或者在windows平臺:
.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092 test2.1.2 發送一些消息數據
kafka有一個客戶端可以從文件或者控制臺獲取輸入作為消息傳遞給kafka集群。默認情況下,每個行會作為一個單獨的消息發送。
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another messagewindows平臺:
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test >This is a message >This is another message2.1.4 啟動一個消費者
kafka也有控制臺命令操作消費者,可以轉出消息作為輸出流:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another messagewindows平臺:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message如果你是在2不同的命令行窗口打開的,現在可以在第一個producer窗口繼續輸入信息,然后就會在另一個consumer窗口輸出打印到控制臺了...
所有的命令行工具都有可選項;不使用任何參數運行命令行則會展示一些幫助信息給你。
2.1.5 啟動多個broker的集群kafka
到目前為止我們已經擁有了一個單例的broker,但是這不太好玩。
對于kafka,一個單例的broker僅僅是一個節點規模為1的集群,所以對于啟動更多的broker實例,并不會有什么太大的變化。但是對于更好的理解這一點,我們將我們的集群增加到3個節點。
-
首先拷貝我們的?config/server.properties
- 拷貝兩次,并且重命名為?config/server-1.properties、?config/server-2.properties
-
然后修改配置文件的key如下值,其余的保持不變
-
config/server-1.properties:
- broker.id=1 listeners=PLAINTEXT://:9093 log.dirs=/tmp/kafka-logs-1
-
config/server-2.properties
- broker.id=2 listeners=PLAINTEXT://:9094 log.dirs=/tmp/kafka-logs-2
-
==broker.id==屬性是在集群中唯一且永久的名稱。我們需要覆蓋端口以及log的目錄,因為我們當前是準備在當前機器運行這些節點。
現在我們啟動這兩個新的節點、使用新的配置文件:
> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...windowd平臺:
.\bin\windows\kafka-server-start.bat .\config\server-1.properties.\bin\windows\kafka-server-start.bat .\config\server-2.properties現在我們來創建一個新的topic使用復制因子為3:
> bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topicwindows平臺;
.\bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topicOK,現在我們已經有一個集群了,但是我們如何知道每個broker在干什么呢? 可以使用 “describe topics” 命令可以查看:
> bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0windows平臺:
.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0關于輸出的結果我們來解釋下:
- 第一行給出了關于所有分區的概括,每一個額外的行給出了每一個分區的信息。因為我們的這個topic僅僅有一個分區所以只有一行。
- ==leader== 節點負責該分區所有的的讀、寫操作。每一個節點都有可能作為該分區的隨機選舉后的leader。
- ==replicas== 副本是復制該分區日志的節點的列表,無論他們是領導者還是當前活動的節點。
- ==isr== 是一組同步副本。是replicas的子集,當前是活動的并且和leader同步的。
我們也可以使用同樣的命令查看原來的test這個topic:
.\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic testTopic:test PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0無需意外,因為原來的test這個topic沒有副本僅僅在server 0中。
==容錯測試==
-
關閉一個節點,再查看狀態
- > ps aux | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java... > kill -9 7564
-
windows平臺:
- wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid ProcessId 20384taskkill /pid 20384 /f 成功: 已終止 PID 為 20384 的進程。
-
現在集群關系可能變更了,我們來看看第二個topic的狀態,發現又一個節點已經沒有保持同步狀態了:
- .\bin\windows\kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,0
-
但是消息還是可以進行發送消費的
總結
kafka是一個消息隊列,主要用于流式數據處理,可以利用它做實時分析、離線分析日志系統平臺。
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
- 上一篇: RocketMQ带你快速入门
- 下一篇: rocketmq中各个角色介绍