Flink运行时架构
1 運行時相關的組件
? ? Flink運行時架構主要包括四個不同的組件:作業管理器(JobManager)、資源管理器(ResourceManager)、任務管理器(TaskManager),以及分發器(Dispatcher)
??(1)資源管理器(ResourceManager)
??主要負責TaskManager的slot(插槽),slot是Flink中處理資源的單元。Flink為不同的環境和資源管理工具提供了不同資源管理器。當JobManager申請插槽資源的時候,ResourceManager會將有空閑插槽的TaskManager分配給JobManager。如果沒有足夠的插槽,可以向資源提供平臺發起會話,以提供啟動TaskManager進程的容器。還負責終止空閑的TaskManager,釋放計算資源。
??(2)JobManager
??協調分布式計算,負責調度任務、協調 checkpoints、協調故障恢復等。每個 Job 至少會有一個 JobManager。高可用部署下會有多個 JobManagers,其中一個作為 leader,其余處于 standby 狀態。
??每個應用程序都會被一個不同的JobManager所控制執行,是控制每一個應用程序執行的主進程。JobManager接收要執行的應用程序,包括作業圖(JobGraph)、邏輯數據流圖(logical dataflow graph)和打包了所有的類、庫和其它資源的JAR包等。JobManager將JobGraph轉換成物理層面的數據流圖也叫執行圖(ExecutionGraph),執行圖包含了所有可以并行進行執行的任務。當JobManager向資源管理器請求完執行任務需要的資源(TaskManager上的slot)時就會將執行圖分發到真正運行它們的TaskManager上,JobManager還需要負責所有需要中央協調的操作。
??(3)TaskManager
??用來執行 dataflow 中的 tasks(準確來說是 subtasks ),并且緩存和交換數據 streams。每個 Job 至少會有一個 TaskManager。
??每一個TaskManager都包含了一定數量的插槽(slots)。插槽的數量限制了TaskManager能夠執行的任務數量。每當TaskManager啟動后都會向資源管理器注冊它的插槽。當資源管理器向它發出提供slot指令后TaskManager就會將一個或者多個插槽提供給JobManager調用,JobManager就可以向插槽分配任務(tasks)來執行了。在執行過程中,同一應用程序的TaskManager之間可以交換數據。
??(4)Dispatcher
??Dispatcher為應用提交提供了REST接口。當一個應用被提交執行時,分發器就會啟動并將應用移交給一個JobManager,可以跨作業運行。Dispatcher是REST接口,所以可以作為集群的一個HTTP接入點,這樣就能夠不受防火墻阻擋。Dispatcher也會啟動一個Web UI,用來方便地展示和監控作業執行的信息。Dispatcher在架構中可能并不是必需的,這取決于應用提交運行的方式。
2 任務提交流程
??當一個任務提交時,較高層次的各運行時組件的交互如下:
??(1)客戶端提交應用
??(2)分發器就會啟動并將應用移交給一個JobManager
??(3)JobManager向ResourceManager申請slots
??(4)ResourceManager啟動TaskManager
??(5)TaskManager啟動后向ResourceManager注冊slots
??(6)ResourceManager向TaskManager發出提供slot的指令
??(7)TaskManager向JobManager提供slots
??(8)JobManager在TaskManager提供的slots中提交要執行的任務
??(9)在執行任務過程中TaskManager之間交互數據
??當部署的集群環境不同(YARN,Mesos,Kubernetes,standalone等),上述步驟會有所不同,如果我們將Flink集群部署到YARN上,提交流程如下:
??(1)Client首先把jar包和配置上傳到hdfs里
??(2)Client提交job到ResourceManager
??(3)ResourceManager分配container資源,通知對應的NodeManager啟動ApplicationMaster
??(4)ApplicationMaster啟動后加載jar包和配置構建環境,啟動JobManager
??(5)ApplicationMaster向ResourceManager申請啟動TaskManager
??(6)ResourceManager分配container之后,ApplicationMaster通知資源所在節點的NodeManager啟動TaskManager
??(7)NodeManager加載jar包和配置構建環境,并啟動TaskManager
??(8)TaskManager啟動后向jobManager發送心跳包,并等待JobManager向其分配任務
3 任務調度
3.1 Job Managers、Task Managers、Clients
??一個Flink代碼首先生成的是一個數據流圖DataFlow graph,然后在Client客戶端經過一些處理之后把它提交給JobManager;JobManager上就會把它結合并行度生成一個執行圖,然后就知道了要多少個TaskManager,要多少個TaskSlots;申請到足夠的資源后就把對應的任務分配到相應的TaskSlots。(注意:每個TaskManager里面可以包含多個TaskSlot,TaskSlot里面到底執行什么Task就看JobManager生成的執行圖是什么樣的,這就涉及TaskManager和Slots的概念
??client:是提交job的客戶端,用于準備并發送dataflow(JobGraph)給Master(JobManager),可以運行在任何機器上,只要能與JobManager環境相連即可,提交job后,client可以結束進程也可以維持連接以等待接收計算結果。
??JobManager:負責Job的調度,并協調task做checkpoint,獲得client提交的的job和jar包等資源后,會生成優化后的執行計劃,以task單元調度到各個TaskManager去執行。
??TaskManager:在啟動的時候就已經設置好槽位數Slot,每個slot能啟動一個task,task為線程,從JobManager接收需要部署的task,部署啟動后為上游建立Netty 連接,接收數據并處理。
3.2 TaskManager、Slots
??TaskManager是一個進程,在Slots上執行的task是一個線程。也就是一個TaskManager是一個JVM進程,可以在里面啟動多個線程執行任務。每個任務要在固定的集合資源中運行,這個資源就是slots。
??所以Slot就是我們執行每個任務線程的資源,而且這個線程相當于是直接劃分好給定的資源,所以每個TaskSlot是表示每個TaskManager擁有的固定大小的子集。如果一個TaskManager有3個TaskSlot就要把自己的內存分成3份給slot,所以Slot之間內存是獨享的。所以某個線程掛了不影響其他的,但是CPU是不獨享的,這也就是為什么建議把Slot數量配置成CPU核心數的原因。這樣在4核的CPU上跑4個獨立的線程,默認每個線程占用一個核心做處理,不會出現cpu輪轉競爭資源,所以slot數量最好把他配成CPU核心數。
??上圖是先把source,map合成一個大任務,然后后面keyby一個窗口做聚合,最后是sink,這是3個任務。這里前面2個任務并行度都是2,總共是5個任務,那么是不是這5個任務不是都占用一個獨立的TaskSlot。假如一個任務對cpu占用比較少,內存消耗也比較少,另外一個任務cpu占用比較多,導致有些任務很快執行完,有些很慢,這樣資源利用率是不高的。我們可以把不同的任務共享一個slot,效果如下
??假設現在并行度是6,總共有13個任務,不需要有13個slot依次排開,因為slot里面可以不同的任務去共享slot,這種共享的方式可以提高資源的利用率的。資源共享還有一個效果:整個處理流程里面相當于,假設所有的slot并行度都是6,每一個slot里面都能包含所有完整的操作步驟,這相當于只要留著一個slot就可以把整個數據操作管道全保存下來,完整的數據流程還是可以留下來的。
??可以發現這里面有數據要傳輸到別的slot上,甚至要跨TaskManager傳輸的話,這要做序列化反序列化,這個過程會降低效率。這就是為什么后面有些任務要合并在一起,本來是不同的操作,如果合并在一起相當于他們之間的數據傳輸就變成一個本地調用了,不需要再去跨slot傳輸,沒有序列化與反序列化,這就是合并的過程,算子鏈。
??注意:(1)對于流處理程序而言,需要占用的slot數量就是整個處理流程中,最大的那個并行度(3)Task Slot與parallelis的區別:Task Slot是靜態的概念,是指TaskManager具有的并發執行能力,可以通過參數taskmanager.numberOfTaskSlots進行配置;而并行度parallelism是動態概念,即TaskManager運行程序時實際使用的并發能力,可以通過參數parallelism.default進行配置。(3)并行的概念。①數據并行:同樣的一個任務,不同的并行子任務,同時處理不同的數據②任務并行:同一時間不同的slot在執行不同的任務。
3.3 數據傳輸形式與并行度
??在執行過程中,一個流(stream)包含一個或多個分區(stream partition),而每一個算子(operator)可以包含一個或多個子任務(operator subtask),這些子任務在不同的線程、不同的物理機或不同的容器中彼此互不依賴地執行。
??一個特定算子的子任務(subtask)的個數被稱之為其并行度(parallelism)。一般情況下,一個流程序的并行度,可以認為就是其所有算子中最大的并行度。不同的算子可能具有不同的并行度,所以算子之間傳輸數據的形式也不一樣,可以分為one-to-one和redistributing,具體是哪一種形式取決于算子的種類。
??(1)one-to-one:stream維護著分區以及元素的順序,如source和map之間,這意味著map算子的子任務看到的元素的個數和順序與source算子的子任務生產的元素的個數,順序相同。像map,fliter,flatMap等算子都是one-to-one的形式,類似于spark中的窄依賴
??(2)redistributing:stream的分區會發生改變,每一個算子的子任務根據所選擇的transformation發送到不同的目標任務。如keyBy是基于hashCode重分區,而broadcast和rebalance會隨機重分區,這些算子都會引起redistributed,其實就類似于Spark中的Shuffle過程,類似于spark中的寬依賴
3.4 任務和算子鏈
??分布式計算中,Flink 將算子(operator)的 subtask *鏈接(chain)*成 task。每個 task 由一個線程執行。相同并行度的one to one操作,Flink這樣相連的算子鏈接在一起形成一個task,原來的算子成為里面的一部分。把算子鏈接成 tasks 能夠減少線程間切換和緩沖的開銷,在降低延遲的同時提高了整體吞吐量。
??下圖的 dataflow 由五個 subtasks 執行,因此具有五個并行線程。
??source讀取數據源,在map后面就hashcode重分區,keyBy做聚合(keyBy本身并不是一個操作,只是定義重分區的模式),然后就想窗口操作,最后sink輸出。
??這里map到后面的窗口操作是要重分區的。假設在代碼里面設置如上并行度,本來的3步操作就分成7個任務,這7個任務因為source和map是one-to-one操作,所以連接在一起,就變成5個子任務。不同的子任務可以共享一個slot,所以其實有2個slot就可以用。整個處理過程中,最大的并行度就是當前需要的slot數量。
??任務之間數據傳輸看操作,source到map本身是窄依賴是one-to-one的操作,并且并行度相同,那么他們可以合并。map到window操作本身是寬依賴,并行度相同也不能合并。window到sink不僅并行度不相同還是窄依賴所以不可以合并
??注意:只有并行度相同,并且是one-to-one類型的數據傳輸,才可以把多個算子合并成一個任務。
3.5 數據流(DataFlow)
??所有的Flink程序都是由三部分組成的: Source(讀取數據源) 、Transformation(利用各種算子進行處理加工)和Sink(輸出)
??程序運行時,Flink上運行的程序會被映射成邏輯數據流(dataflows),每一個dataflow以一個或多個sources開始以一個或多個sinks結束。dataflow類似于任意的有向無環圖DAG。在程序中的transformations跟dataflow中的算子(operator)可以是一一對應的關系,也可以是一個transformation可能對應多個operator。
3.6 執行圖(ExecutionGraph)
??Flink程序直接映射成的數據流圖是StreamGraph,也被稱為邏輯流圖,因為它們表示的是計算邏輯的高級視圖。為了執行一個流處理程序,Flink需要將邏輯流圖轉換為物理數據流圖(也叫執行圖),詳細說明程序的執行方式。
??整個業務的完成,其實就是執行圖的逐漸優化的過程,Flink的執行圖可分為4層:StreamGraph->JobGraph->ExecutionGraph->物理執行圖
??StreamGraph:根據用戶編寫的Stream API編寫的代碼生成的最初的圖,也就是上面的dataflow,用來表示程序的拓撲結構。
??JobGraph:StreamGraph經過優化后就生成了JobGraph,是提交給JobManager的數據結構,主要的優化是:將多個符合條件的節點chain在一起作為一個節點,就可以減少數據在節點之間的流動所需的序列化/反序列化/傳輸消耗。
??ExecutionGraph:JobManager根據JobGraph生成ExecutionGraph,ExecutionGraph是JobGraph的并行化版本,是調度層核心的數據結構。
物理執行圖:ExecutionGraph已經是可以執行的了,JobManager根據ExecutionGraph對Job進行調度后,在各個TaskManager上會把這個圖轉換成最終在每個slot上要執行的代碼,不是具體的數據結構。
??Flink的執行圖流程如下:①最初按照代碼生成的streamGraph(dataflowGraph),對應每一個算子每一步操作都是一個任務;②接下來在Client上會直接生成JobGraph,這步是把符合要求的任務合并在一起,串成一個任務;③JobGraph會提交給JobManager,JobManager會按照當前的并行度把他拆開,這里并行度不一樣就涉及怎樣傳輸,生成真正可以執行的ExecutorGraph;③ExecutorGraph傳給TaskManager去執行生成真正的物理執行圖。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-1naR3qDR-1595864632942)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\ksohtml\wps6090.tmp.jpg)]
3.7 狀態后端(State Backends)
??key/values 索引存儲的數據結構取決于 state backend的選擇。一類 state backend 將數據存儲在內存的哈希映射中,另一類 state backend 使用 RocksDB作為鍵/值存儲。除了定義保存狀態(state)的數據結構之外, state backend 還實現了獲取鍵/值狀態的時間點快照的邏輯,并將該快照存儲為 checkpoint 的一部分。
總結
以上是生活随笔為你收集整理的Flink运行时架构的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java的注释、关键字、标识符、变量常量
- 下一篇: ARM的UART实验