日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink从入门到放弃之源码解析系列-第1章 Flink组件和逻辑计划

發布時間:2024/4/14 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink从入门到放弃之源码解析系列-第1章 Flink组件和逻辑计划 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

>請戳GitHub原文: github.com/wangzhiwubi…


本文參考了網上很多博客,大多數博客都是基于1.1.0版本的,已經嚴重滯后,本系列文章做了很多訂正,歡迎大家指正。

概要和背景

flink是一個被譽為 the 4th G 的計算框架,不同的框架特性及其代表項目列表如下:

第一代

第二代

第三代

第四代

Batch

BatchInteractive

Batch Interactive Near-Real-TimeInterative-processing

Hybrid Interactive Real-Time-StreamingNative-Iterative-processing


DAG Dataflows

RDD

Cyclic Dataflows

Hadoop MapReduce

TEZ

Spark

Flink

本文主要介紹flink的核心組件以及物理計劃的生成過程

參考代碼分支 flink-1.7.1 本系列大概有7-10章,那3章是留給阿里開源的Blink的新特性的。。。

核心組件介紹

這里只介紹 on yarn 模式下的組件flink 的 on yarn 模式支持兩種不同的類型:
  • 單作業單集群
  • 多作業單集群
  • 首先介紹 單作業單集群 的架構,單作業單集群下一個正常的 flink 程序會擁有以下組件job Cli: 非 detatched 模式下的客戶端進程,用以獲取 yarn Application Master 的運行狀態并將日志輸出掉終端JobManager[JM]: 負責作業的運行時計劃 ExecutionGraph 的生成、物理計劃生成和作業調度TaskManager[TM]: 負責被分發 task 的執行、心跳/狀態上報、資源管理Tips: 啟動Flink Yarn Session有2種模式:分離模式、客戶端模式 通過-d指定分離模式,即客戶端在啟動Flink Yarn Session后,就不再屬于Yarn Cluster的一部分。如果想要停止Flink Yarn Application,需要通過yarn application -kill 命令來停止。 整體的架構大致如下圖所示:

    下面將以一次 Job 的提交過程描述 flink 的各組件的作用及協同

    作業提交流程分析

    單作業單集群模式下,一個作業會啟動一個 JM,并依據用戶的參數傳遞啟動相應數量的 TM,每個 TM 運行在 yarn 的一個 container 中,一個通常的 flink on yarn 提交命令:./bin/flink run -m yarn-cluster -yn 2 -j flink-demo-1.0.0-with-dependencies.jar —ytm 1024 -yst 4 -yjm 1024 —yarnname flink_demo flink 在收到這樣一條命令后會首先通過 Cli 獲取 flink 的配置,并解析命令行參數。

    配置加載

    CliFrontend.java 是 flink 提交作業的入口//CliFrontend line144// 1. find the configuration directoryfinal String configurationDirectory = getConfigurationDirectoryFromEnv();
    這里會嘗試加載 conf 文件夾下的所有 yaml 文件,配置文件的命名并沒有強制限制

    參數解析

    解析命令行參數的第一步是路由用戶的命令,然后交由run方法去處理//CliFrontend line1119try { final CliFrontend cli = new CliFrontend( configuration,customCommandLines);
    SecurityUtils.install(new SecurityConfiguration(cli.configuration)); int retCode = SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseParameters(args)); System.exit(retCode);}catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace();System.exit(31);}
    //CliFrontend line1046try { // do action switch (action) { case ACTION_RUN: run(params);return 0; case ACTION_LIST: list(params);return 0; case ACTION_INFO: info(params);return 0; case ACTION_CANCEL: cancel(params);return 0; case ACTION_STOP: stop(params);return 0; case ACTION_SAVEPOINT: savepoint(params);return 0; case ACTION_MODIFY: modify(params);return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines);return 0; case "-v": case "--version": String version = EnvironmentInformation.getVersion();String commitID = EnvironmentInformation.getRevisionInformation().commitId;System.out.print("Version: " + version); System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); return 0; default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println();System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); System.out.println();System.out.println("Specify the version option (-v or --version) to print Flink version.");System.out.println();System.out.println("Specify the help option (-h or --help) to get help on the command.");return 1; }} catch (CliArgsException ce) { return handleArgException(ce); } catch (ProgramParametrizationException ppe) { return handleParametrizationException(ppe); } catch (ProgramMissingJobException pmje) { return handleMissingJobException(); } catch (Exception e) { return handleError(e); }}接下來是程序參數設置過程,flink 將 jar包路徑和參數配置封裝成了 PackagedProgram

    //CliFrontend line201final PackagedProgram program;

    flink集群的構建

    集群類型的解析
    獲取參數后下一步就是集群的構建和部署,flink 通過 兩個不同的 CustomCommandLine 來實現不同集群模式的解析,分別是 FlinkYarnSessionCli和 DefaultCLI 解析命令行參數//CliFrontend line1187final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, configuration,configurationDirectory,"y", "yarn")); } catch (NoClassDefFoundError | Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); }
    customCommandLines.add(new DefaultCLI(configuration));
    return customCommandLines; ...//line210 這里將決定Cli的類型final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine); 那么什么時候解析成 Yarn Cluster 什么時候解析成 Standalone 呢?由于FlinkYarnSessionCli被優先添加到customCommandLine,所以會先觸發下面這段邏輯//FlinkYarnSessionCli line422@Overridepublic boolean isActive(CommandLine commandLine) { String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(applicationId.getOpt()); return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null); }從上面可以看出如果用戶傳入了 -m參數或者application id或者配置了yarn properties 文件,則啟動yarn cluster模式,否則是Standalone模式的集群
    集群部署
    flink通過YarnClusterDescriptor來描述yarn集群的部署配置,具體對應的配置文件為flink-conf.yaml,通過下面這段邏輯觸發集群部署://YarnClusterDescriptor line39public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {
    public YarnClusterDescriptor( Configuration flinkConfiguration,YarnConfiguration yarnConfiguration,String configurationDirectory,YarnClient yarnClient,boolean sharedYarnClient) { super( flinkConfiguration,yarnConfiguration,configurationDirectory,yarnClient,sharedYarnClient);}//AbstractYarnClusterDescriptor 471protected ClusterClient<ApplicationId> deployInternal( ClusterSpecification clusterSpecification,String applicationName,String yarnClusterEntrypoint,@Nullable JobGraph jobGraph, boolean detached) throws Exception { 大致過程:
    • check yarn 集群隊列資源是否滿足請求
    • 設置 AM Context、啟動命令、submission context
    • 通過 yarn client submit am context
    • 將yarn client 及相關配置封裝成 YarnClusterClient 返回
    真正在 AM 中運行的主類是 YarnApplicationMasterRunner,它的 run方法做了如下工作:
    • 啟動JobManager ActorSystem
    • 啟動 flink ui
    • 啟動YarnFlinkResourceManager來負責與yarn的ResourceManager交互,管理yarn資源
    • 啟動 actor System supervise 進程
    到這里 JobManager 已經啟動起來這樣一個 flink 集群便構建出來了。下面附圖解釋下這個流程:
  • flink cli 解析本地環境配置,啟動 ApplicationMaster
  • 在 ApplicationMaster 中啟動 JobManager
  • 在 ApplicationMaster 中啟動YarnFlinkResourceManager
  • YarnFlinkResourceManager給JobManager發送注冊信息
  • YarnFlinkResourceManager注冊成功后,JobManager給YarnFlinkResourceManager發送注冊成功信息
  • YarnFlinkResourceManage知道自己注冊成功后像ResourceManager申請和TaskManager數量對等的 container
  • 在container中啟動TaskManager
  • TaskManager將自己注冊到JobManager中


  • 接下來便是程序的提交和運行程序在CliFrontend中被提交后,會觸發這樣一段邏輯//ClusterClient 39public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());if (prog.isUsingProgramEntryPoint()) {
    final JobWithJars jobWithJars; if (hasUserJarsInClassPath(prog.getAllLibraries())) { jobWithJars = prog.getPlanWithoutJars();} else { jobWithJars = prog.getPlanWithJars();}
    return run(jobWithJars, parallelism, prog.getSavepointSettings()); }else if (prog.isUsingInteractiveMode()) { log.info("Starting program in interactive mode (detached: {})", isDetached());
    final List<URL> libraries; if (hasUserJarsInClassPath(prog.getAllLibraries())) { libraries = Collections.emptyList();} else { libraries = prog.getAllLibraries();}
    ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),prog.getSavepointSettings());ContextEnvironment.setAsContext(factory);
    try { // invoke main method prog.invokeInteractiveModeForExecution(); if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { throw new ProgramMissingJobException("The program didn't contain a Flink job."); }if (isDetached()) { // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); }else { // in blocking mode, we execute all Flink jobs contained in the user code and then return here return this.lastJobExecutionResult; }}finally { ContextEnvironment.unsetContext();}}else { throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode."); }}注意到有一段prog.invokeInteractiveModeForExecution(),這是客戶端生成初步邏輯計劃的核心邏輯,下面將詳細介紹

    客戶端邏輯計劃

    上面提到prog.invokeInteractiveModeForExecution()這段邏輯會觸發客戶端邏輯計劃的生成,那么是怎樣一個過程呢?其實這里只是調用了用戶jar包的主函數,真正的觸發生成過程由用戶代碼的執行來完成。例如用戶寫了這樣一段 flink 代碼:object FlinkDemo extends App with Logging{ override def main(args: Array[String]): Unit ={ val properties = new Properties properties.setProperty("bootstrap.servers", DemoConfig.kafkaBrokerList)
    properties.setProperty("zookeeper.connect","host01:2181,host02:2181,host03:2181/kafka08")properties.setProperty("group.id", "flink-demo")
    val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE) //checkpoint every 5 seconds. val stream = env.addSource(new FlinkKafkaConsumer08[String]("log.waimai_e", new SimpleStringSchema, properties)).setParallelism(2) val counts = stream.name("log.waimai_e").map(toPoiIdTuple(_)).filter(_._2 != null) .keyBy(0).timeWindow(Time.seconds(5)).sum(1)
    counts.addSink(sendToKafka(_))env.execute()}注意到這樣一段val env = StreamExecutionEnvironment.getExecutionEnvironment,這段代碼會獲取客戶端的環境配置,它首先會轉到這樣一段邏輯://StreamExecutionEnvironment 1256public static StreamExecutionEnvironment getExecutionEnvironment() { if (contextEnvironmentFactory != null) { return contextEnvironmentFactory.createExecutionEnvironment(); }
    // because the streaming project depends on "flink-clients" (and not the other way around) // we currently need to intercept the data set environment and create a dependent stream env. // this should be fixed once we rework the project dependencies ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment.getExecutionEnvironment();獲取環境的邏輯如下://ExecutionEnvironment line1137public static ExecutionEnvironment getExecutionEnvironment() { return contextEnvironmentFactory == null ? createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment(); }這里的contextEnvironmentFactory是一個靜態成員,早在ContextEnvironment.setAsContext(factory)已經觸發過初始化了,其中包含了如下的環境信息://ContextEnvironmentFactory line51public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, boolean isDetached, String savepointPath) {this.client = client; this.jarFilesToAttach = jarFilesToAttach; this.classpathsToAttach = classpathsToAttach; this.userCodeClassLoader = userCodeClassLoader; this.defaultParallelism = defaultParallelism; this.isDetached = isDetached; this.savepointPath = savepointPath; }其中的 client 就是上面生成的 YarnClusterClient,其它的意思較明顯,就不多做解釋了。用戶在執行val env = StreamExecutionEnvironment.getExecutionEnvironment這樣一段邏輯后會得到一個StreamContextEnvironment,其中封裝了 streaming 的一些執行配置 【buffer time out等】,另外保存了上面提到的 ContextEnvironment 的引用。到這里關于 streaming 需要的執行環境信息已經設置完成。

    初步邏輯計劃 StreamGraph 的生成

    接下來用戶代碼執行到DataStream<String> stream = env.addSource(consumer);這段邏輯實際會生成一個DataStream抽象,DataStream是flink關于streaming抽象的最核心抽象,后續所有的算子轉換都會在DataStream上來完成,上面的addSource操作會觸發下面這段邏輯:public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
    if (typeInfo == null) { if (function instanceof ResultTypeQueryable) { typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType(); } else { try { typeInfo = TypeExtractor.createTypeInfo( SourceFunction.class, function.getClass(), 0, null, null); } catch (final InvalidTypesException e) { typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e); } } }
    boolean isParallel = function instanceof ParallelSourceFunction;
    clean(function); StreamSource<OUT, ?> sourceOperator; if (function instanceof StoppableFunction) { sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function)); } else { sourceOperator = new StreamSource<>(function); }
    return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); } 簡要總結下上面的邏輯:
    • 獲取數據源 source 的 output 信息 TypeInformation
    • 生成 StreamSource sourceOperator
    • 生成 DataStreamSource【封裝了 sourceOperator】,并返回
    • 將 StreamTransformation 添加到算子列表 transformations 中【只有 轉換 transform 操作才會添加算子,其它都只是暫時做了 transformation 的疊加封裝】
    • 后續會在 DataStream 上做操作
    DataStreamSource 是一個 DataStream 數據流抽象,StreamSource 是一個 StreamOperator 算子抽象,在 flink 中一個 DataStream 封裝了一次數據流轉換,一個 StreamOperator 封裝了一個函數接口,比如 map、reduce、keyBy等。關于算子的介紹會另起一節:flink算子的生命周期可以看到在 DataStream 上可以進行一系列的操作(map filter 等),來看一個常規操作比如 map 會發生什么://DataStream 583public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true);
    return transform("Map", outType, new StreamMap<>(clean(mapper))); }一個map操作會觸發一次 transform,那么transform做了什么工作呢?//DataStream line1175@PublicEvolvingpublic <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    // read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType();
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName,operator,outTypeInfo,environment.getParallelism());
    @SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
    getExecutionEnvironment().addOperator(resultTransform);
    return returnStream; }這一步生成了一個 StreamTransformation并以此作為成員變量封裝成另一個 DataStream 返回,StreamTransformation是 flink關于數據流轉換的核心抽象,只有需要 transform 的流才會生成新的DataStream 算子,后面會詳細解釋,注意上面有這一行getExecutionEnvironment().addOperator(resultTransform)flink會將transformation維護起來://StreamExecutionEnvironment line 1576@Internalpublic void addOperator(StreamTransformation<?> transformation) { Preconditions.checkNotNull(transformation, "transformation must not be null."); this.transformations.add(transformation); }所以,用戶的一連串操作 map join等實際上在 DataStream 上做了轉換,并且flink將這些 StreamTransformation 維護起來,一直到最后,用戶執行 env.execute()這樣一段邏輯,StreamGraph 的構建才算真正開始...用戶在執行 env.execute()會觸發這樣一段邏輯://StreamContextEnvironment line32public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull("Streaming Job name should not be null.");
    StreamGraph streamGraph = this.getStreamGraph(); streamGraph.setJobName(jobName);
    transformations.clear();
    // execute the programs if (ctx instanceof DetachedEnvironment) { LOG.warn("Job was executed in detached mode, the results will be available on completion."); ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; } else { return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath()); }}}這段代碼做了兩件事情:
    • 首先使用 StreamGraphGenerator 產生 StreamGraph
    • 使用 Client 運行 stream graph
    那么 StreamGraphGenerator 做了哪些操作呢?StreamGraphGenerator會依據添加算子時保存的 transformations 信息生成 job graph 中的節點,并創建節點連接,分流操作 如 union,select,split 不會添加邊,只會創建虛擬節點或在上有節點添加 selector這里會將 StreamTransformation 轉換為 StreamNode,StreamNode 保存了算子的信息,如下圖所示到這里由 StreamNode 構成的 DAG 圖 StreamGraph就生成了不過 在提交給 client 的時候,flink 會做進一步的優化:StreamGraph 將進一步轉換為 JobGraph,這一步工作由 StreamingJobGraphGenerator 來完成,為什么要做這一步轉換呢?主要因為有可以 chain 的算子,這里進一步將 StreamNode 轉換為 JobVertex,主要工作是將可以 chain 的算子合并【這一步優化是默認打開的】,并設置資源,重試策略等,最終生成可以提交給 JobManager 的 JobGraphTips: JobVertex:經過優化后符合條件的多個StreamNode可能會chain在一起生成一個JobVertex,即一個JobVertex包含一個或多個operator,JobVertex的輸入是JobEdge,輸出是IntermediateDataSet。 IntermediateDataSet:表示JobVertex的輸出,即經過operator處理產生的數據集。producer是JobVertex,consumer是JobEdge。 JobEdge:代表了job graph中的一條數據傳輸通道。source 是 IntermediateDataSet,target 是 JobVertex。即數據通過JobEdge由IntermediateDataSet傳遞給目標JobVertex。


    總結

    以上是生活随笔為你收集整理的Flink从入门到放弃之源码解析系列-第1章 Flink组件和逻辑计划的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。