Flink从入门到放弃之源码解析系列-第1章 Flink组件和逻辑计划
生活随笔
收集整理的這篇文章主要介紹了
Flink从入门到放弃之源码解析系列-第1章 Flink组件和逻辑计划
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
>請戳GitHub原文: github.com/wangzhiwubi…
本文參考了網上很多博客,大多數博客都是基于1.1.0版本的,已經嚴重滯后,本系列文章做了很多訂正,歡迎大家指正。
概要和背景
flink是一個被譽為 the 4th G 的計算框架,不同的框架特性及其代表項目列表如下:
第一代 | 第二代 | 第三代 | 第四代 |
Batch | BatchInteractive | Batch Interactive Near-Real-TimeInterative-processing | Hybrid Interactive Real-Time-StreamingNative-Iterative-processing |
DAG Dataflows | RDD | Cyclic Dataflows | |
Hadoop MapReduce | TEZ | Spark | Flink |
本文主要介紹flink的核心組件以及物理計劃的生成過程
參考代碼分支 flink-1.7.1 本系列大概有7-10章,那3章是留給阿里開源的Blink的新特性的。。。
核心組件介紹
這里只介紹 on yarn 模式下的組件flink 的 on yarn 模式支持兩種不同的類型:下面將以一次 Job 的提交過程描述 flink 的各組件的作用及協同
作業提交流程分析
單作業單集群模式下,一個作業會啟動一個 JM,并依據用戶的參數傳遞啟動相應數量的 TM,每個 TM 運行在 yarn 的一個 container 中,一個通常的 flink on yarn 提交命令:./bin/flink run -m yarn-cluster -yn 2 -j flink-demo-1.0.0-with-dependencies.jar —ytm 1024 -yst 4 -yjm 1024 —yarnname flink_demo flink 在收到這樣一條命令后會首先通過 Cli 獲取 flink 的配置,并解析命令行參數。配置加載
CliFrontend.java 是 flink 提交作業的入口//CliFrontend line144// 1. find the configuration directoryfinal String configurationDirectory = getConfigurationDirectoryFromEnv();這里會嘗試加載 conf 文件夾下的所有 yaml 文件,配置文件的命名并沒有強制限制
參數解析
解析命令行參數的第一步是路由用戶的命令,然后交由run方法去處理//CliFrontend line1119try { final CliFrontend cli = new CliFrontend( configuration,customCommandLines);SecurityUtils.install(new SecurityConfiguration(cli.configuration)); int retCode = SecurityUtils.getInstalledContext() .runSecured(() -> cli.parseParameters(args)); System.exit(retCode);}catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); LOG.error("Fatal error while running command line interface.", strippedThrowable); strippedThrowable.printStackTrace();System.exit(31);}
//CliFrontend line1046try { // do action switch (action) { case ACTION_RUN: run(params);return 0; case ACTION_LIST: list(params);return 0; case ACTION_INFO: info(params);return 0; case ACTION_CANCEL: cancel(params);return 0; case ACTION_STOP: stop(params);return 0; case ACTION_SAVEPOINT: savepoint(params);return 0; case ACTION_MODIFY: modify(params);return 0; case "-h": case "--help": CliFrontendParser.printHelp(customCommandLines);return 0; case "-v": case "--version": String version = EnvironmentInformation.getVersion();String commitID = EnvironmentInformation.getRevisionInformation().commitId;System.out.print("Version: " + version); System.out.println(commitID.equals(EnvironmentInformation.UNKNOWN) ? "" : ", Commit ID: " + commitID); return 0; default: System.out.printf("\"%s\" is not a valid action.\n", action); System.out.println();System.out.println("Valid actions are \"run\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\"."); System.out.println();System.out.println("Specify the version option (-v or --version) to print Flink version.");System.out.println();System.out.println("Specify the help option (-h or --help) to get help on the command.");return 1; }} catch (CliArgsException ce) { return handleArgException(ce); } catch (ProgramParametrizationException ppe) { return handleParametrizationException(ppe); } catch (ProgramMissingJobException pmje) { return handleMissingJobException(); } catch (Exception e) { return handleError(e); }}接下來是程序參數設置過程,flink 將 jar包路徑和參數配置封裝成了 PackagedProgram//CliFrontend line201final PackagedProgram program;
flink集群的構建
集群類型的解析
獲取參數后下一步就是集群的構建和部署,flink 通過 兩個不同的 CustomCommandLine 來實現不同集群模式的解析,分別是 FlinkYarnSessionCli和 DefaultCLI 解析命令行參數//CliFrontend line1187final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli"; try { customCommandLines.add(loadCustomCommandLine(flinkYarnSessionCLI, configuration,configurationDirectory,"y", "yarn")); } catch (NoClassDefFoundError | Exception e) { LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e); }customCommandLines.add(new DefaultCLI(configuration));
return customCommandLines; ...//line210 這里將決定Cli的類型final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine); 那么什么時候解析成 Yarn Cluster 什么時候解析成 Standalone 呢?由于FlinkYarnSessionCli被優先添加到customCommandLine,所以會先觸發下面這段邏輯//FlinkYarnSessionCli line422@Overridepublic boolean isActive(CommandLine commandLine) { String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(applicationId.getOpt()); return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null); }從上面可以看出如果用戶傳入了 -m參數或者application id或者配置了yarn properties 文件,則啟動yarn cluster模式,否則是Standalone模式的集群
集群部署
flink通過YarnClusterDescriptor來描述yarn集群的部署配置,具體對應的配置文件為flink-conf.yaml,通過下面這段邏輯觸發集群部署://YarnClusterDescriptor line39public class YarnClusterDescriptor extends AbstractYarnClusterDescriptor {public YarnClusterDescriptor( Configuration flinkConfiguration,YarnConfiguration yarnConfiguration,String configurationDirectory,YarnClient yarnClient,boolean sharedYarnClient) { super( flinkConfiguration,yarnConfiguration,configurationDirectory,yarnClient,sharedYarnClient);}//AbstractYarnClusterDescriptor 471protected ClusterClient<ApplicationId> deployInternal( ClusterSpecification clusterSpecification,String applicationName,String yarnClusterEntrypoint,@Nullable JobGraph jobGraph, boolean detached) throws Exception { 大致過程:
- check yarn 集群隊列資源是否滿足請求
- 設置 AM Context、啟動命令、submission context
- 通過 yarn client submit am context
- 將yarn client 及相關配置封裝成 YarnClusterClient 返回
- 啟動JobManager ActorSystem
- 啟動 flink ui
- 啟動YarnFlinkResourceManager來負責與yarn的ResourceManager交互,管理yarn資源
- 啟動 actor System supervise 進程
接下來便是程序的提交和運行程序在CliFrontend中被提交后,會觸發這樣一段邏輯//ClusterClient 39public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException { Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());if (prog.isUsingProgramEntryPoint()) {
final JobWithJars jobWithJars; if (hasUserJarsInClassPath(prog.getAllLibraries())) { jobWithJars = prog.getPlanWithoutJars();} else { jobWithJars = prog.getPlanWithJars();}
return run(jobWithJars, parallelism, prog.getSavepointSettings()); }else if (prog.isUsingInteractiveMode()) { log.info("Starting program in interactive mode (detached: {})", isDetached());
final List<URL> libraries; if (hasUserJarsInClassPath(prog.getAllLibraries())) { libraries = Collections.emptyList();} else { libraries = prog.getAllLibraries();}
ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries, prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),prog.getSavepointSettings());ContextEnvironment.setAsContext(factory);
try { // invoke main method prog.invokeInteractiveModeForExecution(); if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) { throw new ProgramMissingJobException("The program didn't contain a Flink job."); }if (isDetached()) { // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute(); }else { // in blocking mode, we execute all Flink jobs contained in the user code and then return here return this.lastJobExecutionResult; }}finally { ContextEnvironment.unsetContext();}}else { throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode."); }}注意到有一段prog.invokeInteractiveModeForExecution(),這是客戶端生成初步邏輯計劃的核心邏輯,下面將詳細介紹
客戶端邏輯計劃
上面提到prog.invokeInteractiveModeForExecution()這段邏輯會觸發客戶端邏輯計劃的生成,那么是怎樣一個過程呢?其實這里只是調用了用戶jar包的主函數,真正的觸發生成過程由用戶代碼的執行來完成。例如用戶寫了這樣一段 flink 代碼:object FlinkDemo extends App with Logging{ override def main(args: Array[String]): Unit ={ val properties = new Properties properties.setProperty("bootstrap.servers", DemoConfig.kafkaBrokerList)properties.setProperty("zookeeper.connect","host01:2181,host02:2181,host03:2181/kafka08")properties.setProperty("group.id", "flink-demo")
val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE) //checkpoint every 5 seconds. val stream = env.addSource(new FlinkKafkaConsumer08[String]("log.waimai_e", new SimpleStringSchema, properties)).setParallelism(2) val counts = stream.name("log.waimai_e").map(toPoiIdTuple(_)).filter(_._2 != null) .keyBy(0).timeWindow(Time.seconds(5)).sum(1)
counts.addSink(sendToKafka(_))env.execute()}注意到這樣一段val env = StreamExecutionEnvironment.getExecutionEnvironment,這段代碼會獲取客戶端的環境配置,它首先會轉到這樣一段邏輯://StreamExecutionEnvironment 1256public static StreamExecutionEnvironment getExecutionEnvironment() { if (contextEnvironmentFactory != null) { return contextEnvironmentFactory.createExecutionEnvironment(); }
// because the streaming project depends on "flink-clients" (and not the other way around) // we currently need to intercept the data set environment and create a dependent stream env. // this should be fixed once we rework the project dependencies ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); ExecutionEnvironment.getExecutionEnvironment();獲取環境的邏輯如下://ExecutionEnvironment line1137public static ExecutionEnvironment getExecutionEnvironment() { return contextEnvironmentFactory == null ? createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment(); }這里的contextEnvironmentFactory是一個靜態成員,早在ContextEnvironment.setAsContext(factory)已經觸發過初始化了,其中包含了如下的環境信息://ContextEnvironmentFactory line51public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach, List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism, boolean isDetached, String savepointPath) {this.client = client; this.jarFilesToAttach = jarFilesToAttach; this.classpathsToAttach = classpathsToAttach; this.userCodeClassLoader = userCodeClassLoader; this.defaultParallelism = defaultParallelism; this.isDetached = isDetached; this.savepointPath = savepointPath; }其中的 client 就是上面生成的 YarnClusterClient,其它的意思較明顯,就不多做解釋了。用戶在執行val env = StreamExecutionEnvironment.getExecutionEnvironment這樣一段邏輯后會得到一個StreamContextEnvironment,其中封裝了 streaming 的一些執行配置 【buffer time out等】,另外保存了上面提到的 ContextEnvironment 的引用。到這里關于 streaming 需要的執行環境信息已經設置完成。
初步邏輯計劃 StreamGraph 的生成
接下來用戶代碼執行到DataStream<String> stream = env.addSource(consumer);這段邏輯實際會生成一個DataStream抽象,DataStream是flink關于streaming抽象的最核心抽象,后續所有的算子轉換都會在DataStream上來完成,上面的addSource操作會觸發下面這段邏輯:public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {if (typeInfo == null) { if (function instanceof ResultTypeQueryable) { typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType(); } else { try { typeInfo = TypeExtractor.createTypeInfo( SourceFunction.class, function.getClass(), 0, null, null); } catch (final InvalidTypesException e) { typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e); } } }
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function); StreamSource<OUT, ?> sourceOperator; if (function instanceof StoppableFunction) { sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function)); } else { sourceOperator = new StreamSource<>(function); }
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); } 簡要總結下上面的邏輯:
- 獲取數據源 source 的 output 信息 TypeInformation
- 生成 StreamSource sourceOperator
- 生成 DataStreamSource【封裝了 sourceOperator】,并返回
- 將 StreamTransformation 添加到算子列表 transformations 中【只有 轉換 transform 操作才會添加算子,其它都只是暫時做了 transformation 的疊加封裝】
- 后續會在 DataStream 上做操作
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(), true);
return transform("Map", outType, new StreamMap<>(clean(mapper))); }一個map操作會觸發一次 transform,那么transform做了什么工作呢?//DataStream line1175@PublicEvolvingpublic <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName,operator,outTypeInfo,environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" }) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream; }這一步生成了一個 StreamTransformation并以此作為成員變量封裝成另一個 DataStream 返回,StreamTransformation是 flink關于數據流轉換的核心抽象,只有需要 transform 的流才會生成新的DataStream 算子,后面會詳細解釋,注意上面有這一行getExecutionEnvironment().addOperator(resultTransform)flink會將transformation維護起來://StreamExecutionEnvironment line 1576@Internalpublic void addOperator(StreamTransformation<?> transformation) { Preconditions.checkNotNull(transformation, "transformation must not be null."); this.transformations.add(transformation); }所以,用戶的一連串操作 map join等實際上在 DataStream 上做了轉換,并且flink將這些 StreamTransformation 維護起來,一直到最后,用戶執行 env.execute()這樣一段邏輯,StreamGraph 的構建才算真正開始...用戶在執行 env.execute()會觸發這樣一段邏輯://StreamContextEnvironment line32public JobExecutionResult execute(String jobName) throws Exception { Preconditions.checkNotNull("Streaming Job name should not be null.");
StreamGraph streamGraph = this.getStreamGraph(); streamGraph.setJobName(jobName);
transformations.clear();
// execute the programs if (ctx instanceof DetachedEnvironment) { LOG.warn("Job was executed in detached mode, the results will be available on completion."); ((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE; } else { return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath()); }}}這段代碼做了兩件事情:
- 首先使用 StreamGraphGenerator 產生 StreamGraph
- 使用 Client 運行 stream graph
總結
以上是生活随笔為你收集整理的Flink从入门到放弃之源码解析系列-第1章 Flink组件和逻辑计划的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: file.seek()/tell()-笔
- 下一篇: Docker学习笔记_安装和使用Apac