日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

flink DataStream API使用及原理

發布時間:2025/4/5 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 flink DataStream API使用及原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

傳統的大數據處理方式一般是批處理式的,也就是說,今天所收集的數據,我們明天再把今天收集到的數據算出來,以供大家使用,但是在很多情況下,數據的時效性對于業務的成敗是非常關鍵的。

Spark 和 Flink 都是通用的開源大規模處理引擎,目標是在一個系統中支持所有的數據處理以帶來效能的提升。兩者都有相對比較成熟的生態系統。是下一代大數據引擎最有力的競爭者。

Spark 的生態總體更完善一些,在機器學習的集成和易用性上暫時領先。

Flink 在流計算上有明顯優勢,核心架構和模型也更透徹和靈活一些。

本文主要通過實例來分析flink的流式處理過程,并通過源碼的方式來介紹流式處理的內部機制。

DataStream整體概述

主要分5部分,下面我們來分別介紹:

?1.運行環境StreamExecutionEnvironment

StreamExecutionEnvironment是個抽象類,是流式處理的容器,實現類有兩個,分別是

LocalStreamEnvironment: RemoteStreamEnvironment: /*** The StreamExecutionEnvironment is the context in which a streaming program is executed. A* {@link LocalStreamEnvironment} will cause execution in the current JVM, a* {@link RemoteStreamEnvironment} will cause execution on a remote setup.** <p>The environment provides methods to control the job execution (such as setting the parallelism* or the fault tolerance/checkpointing parameters) and to interact with the outside world (data access).** @see org.apache.flink.streaming.api.environment.LocalStreamEnvironment* @see org.apache.flink.streaming.api.environment.RemoteStreamEnvironment*/

2.數據源DataSource數據輸入

包含了輸入格式InputFormat

/*** Creates a new data source.** @param context The environment in which the data source gets executed.* @param inputFormat The input format that the data source executes.* @param type The type of the elements produced by this input format.*/public DataSource(ExecutionEnvironment context, InputFormat<OUT, ?> inputFormat, TypeInformation<OUT> type, String dataSourceLocationName) {super(context, type);this.dataSourceLocationName = dataSourceLocationName;if (inputFormat == null) {throw new IllegalArgumentException("The input format may not be null.");}this.inputFormat = inputFormat;if (inputFormat instanceof NonParallelInput) {this.parallelism = 1;}}

?flink將數據源主要分為內置數據源和第三方數據源,內置數據源有 文件,網絡socket端口及集合類型數據;第三方數據源實用Connector的方式來連接如kafka Connector,es connector等,自己定義的話,可以實現SourceFunction,封裝成Connector來做。

?

3.DataStream轉換

DataStream:同一個類型的流元素,DataStream可以通過transformation轉換成另外的DataStream,示例如下

@link DataStream#map

@link DataStream#filter

?StreamOperator:流式算子的基本接口,三個實現類

AbstractStreamOperator:

OneInputStreamOperator:

TwoInputStreamOperator:

/*** Basic interface for stream operators. Implementers would implement one of* {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or* {@link org.apache.flink.streaming.api.operators.TwoInputStreamOperator} to create operators* that process elements.** <p>The class {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator}* offers default implementation for the lifecycle and properties methods.** <p>Methods of {@code StreamOperator} are guaranteed not to be called concurrently. Also, if using* the timer service, timer callbacks are also guaranteed not to be called concurrently with* methods on {@code StreamOperator}.** @param <OUT> The output type of the operator*/

?4.DataStreamSink輸出

/*** Adds the given sink to this DataStream. Only streams with sinks added* will be executed once the {@link StreamExecutionEnvironment#execute()}* method is called.** @param sinkFunction* The object containing the sink's invoke function.* @return The closed DataStream.*/public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {// read the output type of the input Transform to coax out errors about MissingTypeInfo transformation.getOutputType();// configure the type if neededif (sinkFunction instanceof InputTypeConfigurable) {((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());}StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);getExecutionEnvironment().addOperator(sink.getTransformation());return sink;}

5.執行

/*** Executes the JobGraph of the on a mini cluster of ClusterUtil with a user* specified name.** @param jobName* name of the job* @return The result of the job execution, containing elapsed time and accumulators.*/@Overridepublic JobExecutionResult execute(String jobName) throws Exception {// transform the streaming program into a JobGraphStreamGraph streamGraph = getStreamGraph();streamGraph.setJobName(jobName);JobGraph jobGraph = streamGraph.getJobGraph();jobGraph.setAllowQueuedScheduling(true);Configuration configuration = new Configuration();configuration.addAll(jobGraph.getJobConfiguration());configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");// add (and override) the settings with what the user definedconfiguration.addAll(this.configuration);if (!configuration.contains(RestOptions.BIND_PORT)) {configuration.setString(RestOptions.BIND_PORT, "0");}int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumSlotsPerTaskManager(numSlotsPerTaskManager).build();if (LOG.isInfoEnabled()) {LOG.info("Running job on local embedded Flink mini cluster");}MiniCluster miniCluster = new MiniCluster(cfg);try {miniCluster.start();configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());return miniCluster.executeJobBlocking(jobGraph);}finally {transformations.clear();miniCluster.close();}}

6.總結

  Flink的執行方式類似于管道,它借鑒了數據庫的一些執行原理,實現了自己獨特的執行方式。

7.展望

Stream涉及的內容還包括Watermark,window等概念,因篇幅限制,這篇僅介紹flink DataStream API使用及原理。

下篇將介紹Watermark,下下篇是windows窗口計算。

參考資料

【1】https://baijiahao.baidu.com/s?id=1625545704285534730&wfr=spider&for=pc

【2】https://blog.51cto.com/13654660/2087705

轉載于:https://www.cnblogs.com/davidwang456/p/11046857.html

總結

以上是生活随笔為你收集整理的flink DataStream API使用及原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: av免费在线不卡 | 不卡一区在线 | 国模婷婷| 黄色小视频在线观看免费 | 日韩视频在线播放 | 亚洲一区二区三区四区在线观看 | 就去干成人网 | 曰韩av | 日韩视频欧美视频 | 久久亚洲高清 | 日本一区不卡在线观看 | 欧美日韩国产麻豆 | 欧美在线色图 | 久久精品视频久久 | 一区二区三区四区五区av | 国产乱子伦精品无码专区 | 日本视频在线观看 | 精品人妻久久久久久888不卡 | 一级欧美一级日韩片 | 波多野结衣大片 | 91精品国产色综合久久不卡粉嫩 | 亚洲va久久久噜噜噜无码久久 | 日韩福利一区二区三区 | 国产精品日韩一区二区 | 高清在线一区二区三区 | 在线中文一区 | 欧美黄在线观看 | 99ri精品| 大胸美女被爆操 | 国产精品第100页 | 人成在线视频 | 性午夜| 欧美日韩在线中文字幕 | aa视频在线 | 亚洲经典一区二区三区 | 在线视频天堂 | 大奶子在线 | 欧美日本韩国一区二区 | 亚洲第一色网站 | 精品福利片 | 亚洲福利视频导航 | 日韩欧美一二三区 | 天天曰天天干 | 在线激情| 一卡二卡国产 | 96视频在线 | 亚洲美女av网站 | 欧美特级视频 | 国产精品日韩欧美一区二区三区 | 911美女片黄在线观看游戏 | 乱淫67194 | 91五月天 | 国产精品久久久久久久久免费相片 | 日本人和亚洲人zjzjhd | 国产视频首页 | 边添小泬边狠狠躁视频 | 久久国产成人 | 亚洲熟女综合色一区二区三区 | 日本少妇作爱视频 | 91国产中文字幕 | 久久综合在线 | 不许穿内裤随时挨c调教h苏绵 | 春闺艳妇(h)高h产乳 | 欧美大片一区二区三区 | 中文字幕在线视频一区 | 免费观看国产精品视频 | 蜜桃av噜噜一区二区三区麻豆 | 天堂色播 | 国产黄色在线网站 | 精品无人国产偷自产在线 | 亚洲三页 | 亚洲精品大片 | 91亚瑟视频 | 色噜噜狠狠成人中文 | 双性高h1v1| 色多多视频在线 | 中文字幕+乱码+中文字幕一区 | 奇米91| 国产精品福利视频 | 欧美日韩国内 | 麻豆观看| 亚洲精品视频国产 | 一区二区 亚洲 | 波多野结衣中文字幕在线 | 91一二区| 日韩无码电影 | 91丝袜| www.国产91 | 精品久| 亚洲第一精品在线观看 | 新疆毛片| 午夜精品极品粉嫩国产尤物 | 久青草国产在线 | 99国产精品99久久久久久粉嫩 | 欧美日韩1区2区 | 亚洲精品~无码抽插 | 青青青国内视频在线观看软件 | 亚洲不卡影院 | 国产一级在线免费观看 |