日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

从flink-example分析flink组件(1)WordCount batch实战及源码分析

發布時間:2025/4/5 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 从flink-example分析flink组件(1)WordCount batch实战及源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上一章<windows下flink示例程序的執行> 簡單介紹了一下flink在windows下如何通過flink-webui運行已經打包完成的示例程序(jar),那么我們為什么要使用flink呢?

flink的特征

官網給出的特征如下:

1、一切皆為流(All streaming use cases )

  • 事件驅動應用(Event-driven Applications)

? ? ? ? ? ? ??

  

  • 流式 & 批量分析(Stream & Batch Analytics)

    

?


  

  • ?數據管道&ETL(Data Pipelines & ETL)

?    

?

2、正確性保證(Guaranteed correctness)

  • 唯一狀態一致性(Exactly-once state consistency)
  • 事件-事件處理(Event-time processing)
  • 高超的最近數據處理(Sophisticated late data handling)

3、多層api(Layered APIs)? ?

  • 基于流式和批量數據處理的SQL(SQL on Stream & Batch Data)
  • 流水數據API & 數據集API(DataStream API & DataSet API)
  • 處理函數 (時間 & 狀態)(ProcessFunction (Time & State))

? ? ? ? ? ?

4、易用性

  • 部署靈活(Flexible deployment)
  • 高可用安裝(High-availability setup)
  • 保存點(Savepoints)

5、可擴展性

  • 可擴展架構(Scale-out architecture)
  • 大量狀態的支持(Support for very large state)
  • 增量檢查點(Incremental checkpointing)

6、高性能

  • 低延遲(Low latency)
  • 高吞吐量(High throughput)
  • 內存計算(In-Memory computing)

flink架構?

1、層級結構

?

2.工作架構圖

?

?flink實戰

1、依賴文件pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>flinkDemo</groupId><artifactId>flinkDemo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.5.0</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.5.0</version><!--<scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.10_2.11</artifactId><version>1.5.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hbase --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-hbase_2.11</artifactId><version>1.5.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.1.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.1.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.10</version><scope>compile</scope></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency><dependency><groupId>com.github.rholder</groupId><artifactId>guava-retrying</artifactId><version>2.0.0</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins></build> </project>

2、java程序

public class WordCountDemo {public static void main(String[] args) throws Exception {final ParameterTool params = ParameterTool.fromArgs(args);// create execution environmentfinal ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.getConfig().setGlobalJobParameters(params);// get input dataDataSet<String> text;if (params.has("input")) {// read the text file from given input pathtext = env.readTextFile(params.get("input"));} else {// get default test text dataSystem.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");text = WordCountData.getDefaultTextLineDataSet(env);}DataSet<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".groupBy(0).sum(1);// emit resultif (params.has("output")) {counts.writeAsCsv(params.get("output"), "\n", " ");// execute programenv.execute("WordCount Example");} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}}// *************************************************************************// USER FUNCTIONS// *************************************************************************/*** Implements the string tokenizer that splits sentences into words as a user-defined* FlatMapFunction. The function takes a line (String) and splits it into* multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).*/public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) {// normalize and split the lineString[] tokens = value.toLowerCase().split("\\W+");// emit the pairsfor (String token : tokens) {if (token.length() > 0) {out.collect(new Tuple2<>(token, 1));}}}} }

3、單步調試分析

?  第一步:獲取環境信息ExecutionEnvironment.java

  

/*** The ExecutionEnvironment is the context in which a program is executed. A* {@link LocalEnvironment} will cause execution in the current JVM, a* {@link RemoteEnvironment} will cause execution on a remote setup.** <p>The environment provides methods to control the job execution (such as setting the parallelism)* and to interact with the outside world (data access).** <p>Please note that the execution environment needs strong type information for the input and return types* of all operations that are executed. This means that the environments needs to know that the return* value of an operation is for example a Tuple of String and Integer.* Because the Java compiler throws much of the generic type information away, most methods attempt to re-* obtain that information using reflection. In certain cases, it may be necessary to manually supply that* information to some of the methods.** @see LocalEnvironment* @see RemoteEnvironment*/

  創建本地環境

/*** Creates a {@link LocalEnvironment} which is used for executing Flink jobs.** @param configuration to start the {@link LocalEnvironment} with* @param defaultParallelism to initialize the {@link LocalEnvironment} with* @return {@link LocalEnvironment}*/private static LocalEnvironment createLocalEnvironment(Configuration configuration, int defaultParallelism) {final LocalEnvironment localEnvironment = new LocalEnvironment(configuration);if (defaultParallelism > 0) {localEnvironment.setParallelism(defaultParallelism);}return localEnvironment;}

  第二步:獲取外部數據,創建數據集? ExecutionEnvironment.java

/*** Creates a DataSet from the given non-empty collection. Note that this operation will result* in a non-parallel data source, i.e. a data source with a parallelism of one.** <p>The returned DataSet is typed to the given TypeInformation.** @param data The collection of elements to create the data set from.* @param type The TypeInformation for the produced data set.* @return A DataSet representing the given collection.** @see #fromCollection(Collection)*/public <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type) {return fromCollection(data, type, Utils.getCallLocationName());}private <X> DataSource<X> fromCollection(Collection<X> data, TypeInformation<X> type, String callLocationName) {CollectionInputFormat.checkCollection(data, type.getTypeClass());return new DataSource<>(this, new CollectionInputFormat<>(data, type.createSerializer(config)), type, callLocationName);}

?  數據集的繼承關系

其中,DataSet是一組相同類型數據的集合,抽象類,它提供了數據的轉換功能,如map,reduce,join和coGroup

/*** A DataSet represents a collection of elements of the same type.** <p>A DataSet can be transformed into another DataSet by applying a transformation as for example* <ul>* <li>{@link DataSet#map(org.apache.flink.api.common.functions.MapFunction)},</li>* <li>{@link DataSet#reduce(org.apache.flink.api.common.functions.ReduceFunction)},</li>* <li>{@link DataSet#join(DataSet)}, or</li>* <li>{@link DataSet#coGroup(DataSet)}.</li>* </ul>** @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.*/

Operator是java api的操作基類,抽象類

/*** Base class of all operators in the Java API.** @param <OUT> The type of the data set produced by this operator.* @param <O> The type of the operator, so that we can return it.*/ @Public public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<OUT> {

DataSource具體實現類。

/*** An operation that creates a new data set (data source). The operation acts as the* data set on which to apply further transformations. It encapsulates additional* configuration parameters, to customize the execution.** @param <OUT> The type of the elements produced by this data source.*/ @Public public class DataSource<OUT> extends Operator<OUT, DataSource<OUT>> {

  第三步:對輸入數據集進行轉換

DataSet<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".groupBy(0).sum(1);

? ? ?>>調用map?DataSet.java

/*** Applies a FlatMap transformation on a {@link DataSet}.** <p>The transformation calls a {@link org.apache.flink.api.common.functions.RichFlatMapFunction} for each element of the DataSet.* Each FlatMapFunction call can return any number of elements including none.** @param flatMapper The FlatMapFunction that is called for each element of the DataSet.* @return A FlatMapOperator that represents the transformed DataSet.** @see org.apache.flink.api.common.functions.RichFlatMapFunction* @see FlatMapOperator* @see DataSet*/public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {if (flatMapper == null) {throw new NullPointerException("FlatMap function must not be null.");}String callLocation = Utils.getCallLocationName();TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);}

  >>調用groupby? ?DataSet.java

/*** Groups a {@link Tuple} {@link DataSet} using field position keys.** <p><b>Note: Field position keys only be specified for Tuple DataSets.</b>** <p>The field position keys specify the fields of Tuples on which the DataSet is grouped.* This method returns an {@link UnsortedGrouping} on which one of the following grouping transformation* can be applied.* <ul>* <li>{@link UnsortedGrouping#sortGroup(int, org.apache.flink.api.common.operators.Order)} to get a {@link SortedGrouping}.* <li>{@link UnsortedGrouping#aggregate(Aggregations, int)} to apply an Aggregate transformation.* <li>{@link UnsortedGrouping#reduce(org.apache.flink.api.common.functions.ReduceFunction)} to apply a Reduce transformation.* <li>{@link UnsortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)} to apply a GroupReduce transformation.* </ul>** @param fields One or more field positions on which the DataSet will be grouped.* @return A Grouping on which a transformation needs to be applied to obtain a transformed DataSet.** @see Tuple* @see UnsortedGrouping* @see AggregateOperator* @see ReduceOperator* @see org.apache.flink.api.java.operators.GroupReduceOperator* @see DataSet*/public UnsortedGrouping<T> groupBy(int... fields) {return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));}

?

  >>調用sum??UnsortedGrouping.java

/*** Syntactic sugar for aggregate (SUM, field).* @param field The index of the Tuple field on which the aggregation function is applied.* @return An AggregateOperator that represents the summed DataSet.** @see org.apache.flink.api.java.operators.AggregateOperator*/public AggregateOperator<T> sum (int field) {return this.aggregate (Aggregations.SUM, field, Utils.getCallLocationName());}// private helper that allows to set a different call location nameprivate AggregateOperator<T> aggregate(Aggregations agg, int field, String callLocationName) {return new AggregateOperator<T>(this, agg, field, callLocationName);}

?UnsortedGrouping和DataSet的關系

  UnsortedGrouping使用AggregateOperator做聚合

  第四步:對轉換的輸入值進行處理

// emit resultif (params.has("output")) {counts.writeAsCsv(params.get("output"), "\n", " ");// execute programenv.execute("WordCount Example");} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}

  如果不指定output參數,則打印到控制臺

/*** Prints the elements in a DataSet to the standard output stream {@link System#out} of the JVM that calls* the print() method. For programs that are executed in a cluster, this method needs* to gather the contents of the DataSet back to the client, to print it there.** <p>The string written for each element is defined by the {@link Object#toString()} method.** <p>This method immediately triggers the program execution, similar to the* {@link #collect()} and {@link #count()} methods.** @see #printToErr()* @see #printOnTaskManager(String)*/public void print() throws Exception {List<T> elements = collect();for (T e: elements) {System.out.println(e);}}

  若指定輸出,則先進行輸入轉換為csv文件的DataSink,它是用來存儲數據結果的

/*** An operation that allows storing data results.* @param <T>*/

過程如下:

/*** Writes a {@link Tuple} DataSet as CSV file(s) to the specified location with the specified field and line delimiters.** <p><b>Note: Only a Tuple DataSet can written as a CSV file.</b>* For each Tuple field the result of {@link Object#toString()} is written.** @param filePath The path pointing to the location the CSV file is written to.* @param rowDelimiter The row delimiter to separate Tuples.* @param fieldDelimiter The field delimiter to separate Tuple fields.* @param writeMode The behavior regarding existing files. Options are NO_OVERWRITE and OVERWRITE.** @see Tuple* @see CsvOutputFormat* @see DataSet#writeAsText(String) Output files and directories*/public DataSink<T> writeAsCsv(String filePath, String rowDelimiter, String fieldDelimiter, WriteMode writeMode) {return internalWriteAsCsv(new Path(filePath), rowDelimiter, fieldDelimiter, writeMode);}@SuppressWarnings("unchecked")private <X extends Tuple> DataSink<T> internalWriteAsCsv(Path filePath, String rowDelimiter, String fieldDelimiter, WriteMode wm) {Preconditions.checkArgument(getType().isTupleType(), "The writeAsCsv() method can only be used on data sets of tuples.");CsvOutputFormat<X> of = new CsvOutputFormat<>(filePath, rowDelimiter, fieldDelimiter);if (wm != null) {of.setWriteMode(wm);}return output((OutputFormat<T>) of);}/*** Emits a DataSet using an {@link OutputFormat}. This method adds a data sink to the program.* Programs may have multiple data sinks. A DataSet may also have multiple consumers (data sinks* or transformations) at the same time.** @param outputFormat The OutputFormat to process the DataSet.* @return The DataSink that processes the DataSet.** @see OutputFormat* @see DataSink*/public DataSink<T> output(OutputFormat<T> outputFormat) {Preconditions.checkNotNull(outputFormat);// configure the type if neededif (outputFormat instanceof InputTypeConfigurable) {((InputTypeConfigurable) outputFormat).setInputType(getType(), context.getConfig());}DataSink<T> sink = new DataSink<>(this, outputFormat, getType());this.context.registerDataSink(sink);return sink;}

  最后執行job

@Overridepublic JobExecutionResult execute(String jobName) throws Exception {if (executor == null) {startNewSession();}Plan p = createProgramPlan(jobName);// Session management is disabled, revert this commit to enable//p.setJobId(jobID);//p.setSessionTimeout(sessionTimeout); JobExecutionResult result = executor.executePlan(p);this.lastJobExecutionResult = result;return result;}

這一階段是內容比較多,放到下一篇講解吧

總結

  Apache Flink 功能強大,支持開發和運行多種不同種類的應用程序。它的主要特性包括:批流一體化、精密的狀態管理、事件時間支持以及精確一次的狀態一致性保障等。Flink 不僅可以運行在包括 YARN、 Mesos、Kubernetes 在內的多種資源管理框架上,還支持在裸機集群上獨立部署。在啟用高可用選項的情況下,它不存在單點失效問題。事實證明,Flink 已經可以擴展到數千核心,其狀態可以達到 TB 級別,且仍能保持高吞吐、低延遲的特性。世界各地有很多要求嚴苛的流處理應用都運行在 Flink 之上。

  其應用場景如下:  

1、事件驅動型應用
  典型的事件驅動型應用實例:
  反欺詐
  異常檢測
  基于規則的報警
  業務流程監控
 (社交網絡)Web 應用
2、數據分析應用
  典型的數據分析應用實例
  電信網絡質量監控
  移動應用中的產品更新及實驗評估分析
  消費者技術中的實時數據即席分析
  大規模圖分析
3、數據管道應用
  典型的數據管道應用實例
  電子商務中的實時查詢索引構建
  電子商務中的持續 ETL

參考資料

【1】https://flink.apache.org/

【2】https://blog.csdn.net/yangyin007/article/details/82382734

【3】https://flink.apache.org/zh/usecases.html

?

轉載于:https://www.cnblogs.com/davidwang456/p/10948698.html

總結

以上是生活随笔為你收集整理的从flink-example分析flink组件(1)WordCount batch实战及源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。