1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)
1.10.Flink DataStreamAPI
1.10.1.Flink API的抽象級(jí)別
1.10.2.DatSource部分詳解
1.10.2.1.DataStream API之Data Sources
1.10.2.2.DataSources API
1.10.2.3.DataStream內(nèi)置connectors
1.10.2.4.Source容錯(cuò)性保證
1.10.2.5.Sink容錯(cuò)性保證
1.10.2.6.自定義sink
1.10.2.7.Table & SQL Connectors
1.10.2.8.自定義source
1.10.2.9.DataStream API之Transformations部分詳解
1.10.2.10.DataStream API之partition
1.10.2.11.DataStream API之Data Sink
1.10.Flink DataStreamAPI
1.10.1.Flink API的抽象級(jí)別
?Flink API 最底層的抽象為有狀態(tài)實(shí)時(shí)流處理。其抽象實(shí)現(xiàn)是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用。它允許用戶在應(yīng)用程序中自由地處理來自單流或多流的事件(數(shù)據(jù)),并提供具有全局一致性和容錯(cuò)保障的狀態(tài)。此外,用戶可以在此層抽象中注冊(cè)事件時(shí)間(event time)和處理時(shí)間(processing time)回調(diào)方法,從而允許程序可以實(shí)現(xiàn)復(fù)雜計(jì)算。
?Flink API 第二層抽象是 Core APIs。實(shí)際上,許多應(yīng)用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進(jìn)行編程:其中包含 DataStream API(應(yīng)用于有界/無界數(shù)據(jù)流場(chǎng)景)和 DataSet API(應(yīng)用于有界數(shù)據(jù)集場(chǎng)景)兩部分。Core APIs 提供的流式 API(Fluent API)為數(shù)據(jù)處理提供了通用的模塊組件,例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等。此層 API 中處理的數(shù)據(jù)類型在每種編程語言中都有其對(duì)應(yīng)的類。
Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來實(shí)現(xiàn)自己的需求。DataSet API 還額外提供了一些原語,比如循環(huán)/迭代(loop/iteration)操作。
?Flink API 第三層抽象是 Table API。Table API 是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數(shù)據(jù)場(chǎng)景下,它可以表示一張正在動(dòng)態(tài)改變的表。Table API 遵循(擴(kuò)展)關(guān)系模型:即表擁有 schema(類似于關(guān)系型數(shù)據(jù)庫中的 schema),并且 Table API 也提供了類似于關(guān)系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應(yīng)執(zhí)行的邏輯操作,而不是確切地指定程序應(yīng)該執(zhí)行的代碼。盡管 Table API 使用起來很簡潔并且可以由各種類型的用戶自定義函數(shù)擴(kuò)展功能,但還是比 Core API 的表達(dá)能力差。此外,Table API 程序在執(zhí)行之前還會(huì)使用優(yōu)化器中的優(yōu)化規(guī)則對(duì)用戶編寫的表達(dá)式進(jìn)行優(yōu)化。
表和 DataStream/DataSet 可以進(jìn)行無縫切換,Flink 允許用戶在編寫應(yīng)用程序時(shí)將 Table API 與 DataStream/DataSet API 混合使用。
?Flink API 最頂層抽象是 SQL。這層抽象在語義和程序表達(dá)式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達(dá)式。SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的,并且 SQL 查詢語句可以在 Table API 中定義的表上執(zhí)行。
本技術(shù)文檔上案例所需的pom.xml如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>xxxxx.demo</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--maven properties --><maven.test.skip>false</maven.test.skip><maven.javadoc.skip>false</maven.javadoc.skip><!-- compiler settings properties --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><rocketmq.version>4.7.1</rocketmq.version><flink.version>1.11.1</flink.version><commons-lang.version>2.5</commons-lang.version><scala.binary.version>2.12</scala.binary.version></properties><distributionManagement><repository><id>releases</id><layout>default</layout><url>http://ip/nexus/content/repositories/releases/</url></repository><snapshotRepository><id>snapshots</id><name>snapshots</name><url>http://ip/nexus/content/repositories/snapshots/</url></snapshotRepository></distributionManagement><repositories><repository><id>releases</id><layout>default</layout><url>http://ip/nexus/content/repositories/releases/</url></repository><repository><id>snapshots</id><name>snapshots</name><url>http://ip/nexus/content/repositories/snapshots/</url><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>warn</checksumPolicy></snapshots></repository><repository><id>tianque</id><name>tianque</name><url>http://ip/nexus/content/repositories/tianque/</url></repository><repository><id>public</id><name>public</name><url>http://ip/nexus/content/groups/public/</url></repository><!-- 新加 --><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!--1.compile : 默認(rèn)的scope,運(yùn)行期有效,需要打入包中。2.provided : 編譯器有效,運(yùn)行期不需要提供,不會(huì)打入包中。3.runtime : 編譯不需要,在運(yùn)行期有效,需要導(dǎo)入包中。(接口與實(shí)現(xiàn)分離)4.test : 測(cè)試需要,不會(huì)打入包中5.system : 非本地倉庫引入、存在系統(tǒng)的某個(gè)路徑下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.tianque.doraemon</groupId><artifactId>issue-business-api</artifactId><version>1.0.6.RELEASE</version></dependency><!-- 使用scala編程的時(shí)候使用下面的依賴 start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 使用scala編程的時(shí)候使用下面的依賴 end--><!-- kafka connector scala 2.12 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>test</scope></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>${rocketmq.version}</version><exclusions><exclusion><groupId>io.netty</groupId><artifactId>netty-tcnative</artifactId></exclusion></exclusions></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>${commons-lang.version}</version></dependency><!--test --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope><version>4.12</version></dependency><dependency><groupId>org.powermock</groupId><artifactId>powermock-module-junit4</artifactId><version>1.5.5</version><scope>test</scope></dependency><dependency><groupId>org.powermock</groupId><artifactId>powermock-api-mockito</artifactId><version>1.5.5</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-namesrv</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-broker</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>com.tianque</groupId><artifactId>caterpillar-sdk</artifactId><version>0.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version></dependency></dependencies><build><plugins><!-- 編譯插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>UTF-8</encoding><compilerVersion>${maven.compiler.source}</compilerVersion><showDeprecation>true</showDeprecation><showWarnings>true</showWarnings></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>${maven.test.skip}</skipTests></configuration></plugin><plugin><groupId>org.apache.rat</groupId><artifactId>apache-rat-plugin</artifactId><version>0.12</version><configuration><excludes><exclude>README.md</exclude></excludes></configuration></plugin><plugin><artifactId>maven-checkstyle-plugin</artifactId><version>2.17</version><executions><execution><id>verify</id><phase>verify</phase><configuration><configLocation>style/rmq_checkstyle.xml</configLocation><encoding>UTF-8</encoding><consoleOutput>true</consoleOutput><failsOnError>true</failsOnError><includeTestSourceDirectory>false</includeTestSourceDirectory><includeTestResources>false</includeTestResources></configuration><goals><goal>check</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>2.10.4</version><configuration><aggregate>true</aggregate><reportOutputDirectory>javadocs</reportOutputDirectory><locale>en</locale></configuration></plugin><!-- scala編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(會(huì)包含所有依賴) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以設(shè)置jar包的入口類(可選),此處根據(jù)自己項(xiàng)目的情況進(jìn)行修改 --><mainClass>xxxxx.SocketWindowWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>1.10.2.DatSource部分詳解
1.10.2.1.DataStream API之Data Sources
?source是程序的數(shù)據(jù)源輸入,你可以通過StreamExecutionEnvironment.addSource(sourceFunction)來為你的程序添加一個(gè)source。
?Flink提供了大量的已經(jīng)實(shí)現(xiàn)好的source方法,你也可以自定義source
- ?通過實(shí)現(xiàn)sourceFunction接口來自定義無并行度的source
- ?或者你也可以通過實(shí)現(xiàn)ParallelSourceFunction接口or繼承RichParallelSourceFunction來自定義有并行度的source。
以下是自定義Source相關(guān)的內(nèi)容
import org.apache.flink.streaming.api.functions.source.SourceFunction;/*** 自定義實(shí)現(xiàn)并行度為1的source** 模擬產(chǎn)生從1開始的遞增數(shù)字*** 注意:* SourceFunction 和 SourceContext 都需要指定數(shù)據(jù)類型,如果不指定,代碼運(yùn)行的時(shí)候會(huì)報(bào)錯(cuò)* Caused by: org.apache.flink.api.common.functions.InvalidTypesException:* The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred.* Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyNoParalleSource implements SourceFunction<Long>{private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動(dòng)一個(gè)source* 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產(chǎn)生一條數(shù)據(jù)Thread.sleep(1000);}}/*** 取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法**/@Overridepublic void cancel() {isRunning = false;} }scala代碼:
import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyNoParallelSourceScala extends SourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;/*** 自定義實(shí)現(xiàn)一個(gè)支持并行度的source* Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyParalleSource implements ParallelSourceFunction<Long> {private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動(dòng)一個(gè)source* 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產(chǎn)生一條數(shù)據(jù)Thread.sleep(1000);}}/*** 取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法**/@Overridepublic void cancel() {isRunning = false;} } ```java import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;/*** 自定義實(shí)現(xiàn)一個(gè)支持并行度的source** RichParallelSourceFunction 會(huì)額外提供open和close方法* 針對(duì)source中如果需要獲取其他鏈接資源,那么可以在open方法中獲取資源鏈接,在close中關(guān)閉資源鏈接** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyRichParalleSource extends RichParallelSourceFunction<Long> {private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動(dòng)一個(gè)source* 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產(chǎn)生一條數(shù)據(jù)Thread.sleep(1000);}}/*** 取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法**/@Overridepublic void cancel() {isRunning = false;}/*** 這個(gè)方法只會(huì)在最開始的時(shí)候被調(diào)用一次* 實(shí)現(xiàn)獲取鏈接的代碼* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open.............");super.open(parameters);}/*** 實(shí)現(xiàn)關(guān)閉鏈接的代碼* @throws Exception*/@Overridepublic void close() throws Exception {super.close(); }}使用自己定義的source
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用并行度為1的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyNoPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對(duì)此source,并行度只能設(shè)置為1DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用多并行度的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用多并行度的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyRichPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();env.execute(jobName);} }以下是scala的實(shí)現(xiàn)
import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyNoParallelSourceScala extends SourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyParallelSourceScala extends ParallelSourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyRichParallelSourceScala extends RichParallelSourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}override def open(parameters: Configuration): Unit = super.open(parameters)override def close(): Unit = super.close() } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyNoParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)val mapData = text.map(line=>{println("接收到的數(shù)據(jù):"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyParallelSourceScala).setParallelism(2)val mapData = text.map(line=>{println("接收到的數(shù)據(jù):"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyRichParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyRichParallelSourceScala).setParallelism(2)val mapData = text.map(line=>{println("接收到的數(shù)據(jù):"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}1.10.2.2.DataSources API
?基于文件
- ?readTextFile(path)
- ?讀取文本文件,文件遵循TextInputFormat讀取規(guī)則,逐行讀取并返回。
?基于socket - ?從socket中讀取數(shù)據(jù),元素可以通過一個(gè)分隔符切開。
?基于集合 - ?fromCollection(Collection)
- ?通過java 的collection集合創(chuàng)建一個(gè)數(shù)據(jù)流,集合中的所有元素必須是相同類型的。
?自定義輸入 - ?addSource 可以實(shí)現(xiàn)讀取第三方數(shù)據(jù)源的數(shù)據(jù)
- ?系統(tǒng)內(nèi)置提供了一批connectors,連接器會(huì)提供對(duì)應(yīng)的source支持【kafka】
基于集合的案例fromCollection(Collection):
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author tuzuoquan* @version 1.0* @ClassName StreamingFromCollection* @description TODO* @date 2020/9/16 13:49**/ public class StreamingFromCollection {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Integer> data = new ArrayList<>();data.add(10);data.add(15);data.add(20);//指定數(shù)據(jù)源DataStreamSource<Integer> collectionData = env.fromCollection(data);//通map對(duì)數(shù)據(jù)進(jìn)行處理 // DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() { // @Override // public Integer map(Integer value) throws Exception { // return value + 1; // } // });//通map對(duì)數(shù)據(jù)進(jìn)行處理//DataStream<String>中的String為最終的系統(tǒng)中返回值//new MapFunction<Integer, String>中的String和返回值的類型保值一致//public String map(Integer value)中的String就是返回值中的類型DataStream<String> num = collectionData.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer value) throws Exception {return value + 1 + "_suffix";}});//直接打印num.print().setParallelism(1);env.execute("StreamingFromCollection");}}1.10.2.3.DataStream內(nèi)置connectors
一些比較基本的 Source 和 Sink 已經(jīng)內(nèi)置在 Flink 里。 預(yù)定義 data sources 支持從文件、目錄、socket,以及 collections 和 iterators 中讀取數(shù)據(jù)。 預(yù)定義 data sinks 支持把數(shù)據(jù)寫入文件、標(biāo)準(zhǔn)輸出(stdout)、標(biāo)準(zhǔn)錯(cuò)誤輸出(stderr)和socket。
?Apache Kafka (source/sink)
?Apache Cassandra (sink)
?Amazon Kinesis Streams (source/sink)
?Elasticsearch (sink)
?Hadoop FileSystem (sink)
?RabbitMQ (source/sink)
?Apache NiFi (source/sink)
?Twitter Streaming API (source)
?Google PubSub (source/sink)
?JDBC (sink)
在使用一種連接器時(shí),通常需要額外的第三方組件,比如:數(shù)據(jù)存儲(chǔ)服務(wù)器或者消費(fèi)隊(duì)列。要注意這些列舉的連接器是Flink工程的一部分,包含在發(fā)布的源碼中,但是不包含在二進(jìn)制發(fā)行版本中。
1.10.2.4.Source容錯(cuò)性保證
| Apache Kafka | 精確一次 | 根據(jù)你的版本恰當(dāng)?shù)腒afka連接器 |
| AWS Kinesis Streams | 精確一次 | |
| RabbitMQ | 至多一次 (v 0.10) / 精確一次 (v 1.0) | |
| Twitter Streaming API | 至多一次 | |
| Google PubSub | 至少一次 | |
| Collections | 精確一次 | |
| Files | 精確一次 | |
| Sockets | 至多一次 |
為了保證端到端精確一次的數(shù)據(jù)交付(在精確一次的狀態(tài)語義上更進(jìn)一步),sink需要參與checkpointing。下表列舉了Flink與其自帶Sink的交付保證(假設(shè)精確一次狀態(tài)更新)
1.10.2.5.Sink容錯(cuò)性保證
| HDFS BucketingSink | 精確一次 | 實(shí)現(xiàn)方法取決于 Hadoop 的版本 |
| Elasticsearch | 至少一次 | |
| Kafka producer | 至少一次/精確一次 | 當(dāng)使用事務(wù)生產(chǎn)者時(shí),保證精確一次 (v 0.11+) |
| Cassandra sink | 至少一次 / 精確一次 | |
| AWS Kinesis Streams | 至少一次 | |
| File sinks | 精確一次 | |
| Socket sinks | 至少一次 | |
| Standard output | 至少一次 | |
| Redis sink | 至少一次 |
1.10.2.6.自定義sink
?實(shí)現(xiàn)自定義的sink
- ?實(shí)現(xiàn)SinkFunction接口
- ?或者繼承RichSinkFunction
1.10.2.7.Table & SQL Connectors
- ?Formats
- ?Kafka
- ?JDBC
- ?Elasticsearch
- ?FileSystem
- ?HBASE
- ?DataGen
- ?BlackHole
1.10.2.8.自定義source
?實(shí)現(xiàn)并行度為1的自定義source
- ?實(shí)現(xiàn)SourceFunction
- ?一般不需要實(shí)現(xiàn)容錯(cuò)性保證
- ?處理好cancel方法(cancel應(yīng)用的時(shí)候,這個(gè)方法會(huì)被調(diào)用)
?實(shí)現(xiàn)并行化的自定義source - ?實(shí)現(xiàn)ParallelSourceFunction
- ?或者繼承RichParallelSourceFunction
繼承RichParallelSourceFunction的那些SourceFunction意味著它們都是并行執(zhí)行的并且可能有一些資源需要open/close
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/*** broadcast分區(qū)規(guī)則** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyNoPralalleSourceBroadcast {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);//獲取數(shù)據(jù)源//注意:針對(duì)此source,并行度只能設(shè)置為1DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {long id = Thread.currentThread().getId();System.out.println("線程id:"+id+",接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();env.execute(jobName);} }1.10.2.9.DataStream API之Transformations部分詳解
- ?map:輸入一個(gè)元素,然后返回一個(gè)元素,中間可以做一些清洗轉(zhuǎn)換等操作
- ?flatmap:輸入一個(gè)元素,可以返回零個(gè),一個(gè)或者多個(gè)元素
- ?filter:過濾函數(shù),對(duì)傳入的數(shù)據(jù)進(jìn)行判斷,符合條件的數(shù)據(jù)會(huì)被留下
- ?keyBy:根據(jù)指定的key進(jìn)行分組,相同key的數(shù)據(jù)會(huì)進(jìn)入同一個(gè)分區(qū)【典型用法見備注】
- ?reduce:對(duì)數(shù)據(jù)進(jìn)行聚合操作,結(jié)合當(dāng)前元素和上一次reduce返回的值進(jìn)行聚合操作,然后返回一個(gè)新的值
- ?aggregations:sum(),min(),max()等
- ?window:在后面單獨(dú)詳解
- ?Union:合并多個(gè)流,新的流會(huì)包含所有流中的數(shù)據(jù),但是union是一個(gè)限制,就是所有合并的流類型必須是一致的。
- ?Connect:和union類似,但是只能連接兩個(gè)流,兩個(gè)流的數(shù)據(jù)類型可以不同,會(huì)對(duì)兩個(gè)流中的數(shù)據(jù)應(yīng)用不同的處理方法。
- ?CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函數(shù),類似于map和flatmap
- ?Split:根據(jù)規(guī)則把一個(gè)數(shù)據(jù)流切分為多個(gè)流
- ?Select:和split配合使用,選擇切分后的流
兩種典型用法:
dataStream.keyBy(“someKey”) // 指定對(duì)象中的 "someKey"字段作為分組key
dataStream.keyBy(0) //指定Tuple中的第一個(gè)元素作為分組key
注意:以下類型是無法作為key的
- 1:一個(gè)實(shí)體類對(duì)象,沒有重寫hashCode方法,并且依賴object的hashCode方法
- 2:一個(gè)任意形式的數(shù)組類型
- 3:基本數(shù)據(jù)類型,int,long
1.10.2.10.DataStream API之partition
?Random partitioning:隨機(jī)分區(qū)
- ?dataStream.shuffle()
?Rebalancing:對(duì)數(shù)據(jù)集進(jìn)行再平衡,重分區(qū),消除數(shù)據(jù)傾斜 - ?dataStream.rebalance()
?Rescaling:解釋見備注 - ?dataStream.rescale()
?Custom partitioning:自定義分區(qū) - ?自定義分區(qū)需要實(shí)現(xiàn)Partitioner接口
- ?dataStream.partitionCustom(partitioner, “someKey”)
- ?或者dataStream.partitionCustom(partitioner, 0);
?Broadcasting:在后面單獨(dú)詳解
Rescaling解釋:
舉個(gè)例子:
如果上游操作有2個(gè)并發(fā),而下游操作有4個(gè)并發(fā),那么上游的一個(gè)并發(fā)結(jié)果分配給下游的兩個(gè)并發(fā)操作,另外的一個(gè)并發(fā)結(jié)果分配給了下游的另外兩個(gè)并發(fā)操作.另一方面,下游有兩個(gè)并發(fā)操作而上游又4個(gè)并發(fā)操作,那么上游的其中兩個(gè)操作的結(jié)果分配給下游的一個(gè)并發(fā)操作而另外兩個(gè)并發(fā)操作的結(jié)果則分配給另外一個(gè)并發(fā)操作。
Rescaling與Rebalancing的區(qū)別:
Rebalancing會(huì)產(chǎn)生全量重分區(qū),而Rescaling不會(huì)。
自定義分區(qū)案例:
import org.apache.flink.api.common.functions.Partitioner;/*** Created by xxxx on 2020/10/09*/ public class MyPartition implements Partitioner<Long> {@Overridepublic int partition(Long key, int numPartitions) {System.out.println("分區(qū)總數(shù):"+numPartitions);if(key % 2 == 0){return 0;}else{return 1;}} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/**** 使用自定義分析* 根據(jù)數(shù)字的奇偶性來分區(qū)** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class SteamingDemoWithMyParitition {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());//對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,把long類型轉(zhuǎn)成tuple1類型DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() {@Overridepublic Tuple1<Long> map(Long value) throws Exception {return new Tuple1<>(value);}});//分區(qū)之后的數(shù)據(jù)DataStream<Tuple1<Long>> partitionData = tupleData.partitionCustom(new MyPartition(), 0);DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() {@Overridepublic Long map(Tuple1<Long> value) throws Exception {System.out.println("當(dāng)前線程id:" + Thread.currentThread().getId() + ",value: " + value);return value.getField(0);}});result.print().setParallelism(1);env.execute("SteamingDemoWithMyParitition");} }scala案例:
import org.apache.flink.api.common.functions.Partitioner/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyPartitionerScala extends Partitioner[Long]{override def partition(key: Long, numPartitions: Int) = {println("分區(qū)總數(shù):"+numPartitions)if(key % 2 ==0){0}else{1}}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import xxx.xxx.streaming.custormSource.MyNoParallelSourceScala/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoMyPartitionerScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)//把long類型的數(shù)據(jù)轉(zhuǎn)成tuple類型val tupleData = text.map(line=>{Tuple1(line)// 注意tuple1的實(shí)現(xiàn)方式})val partitionData = tupleData.partitionCustom(new MyPartitionerScala,0)val result = partitionData.map(line=>{println("當(dāng)前線程id:"+Thread.currentThread().getId+",value: "+line)line._1})result.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}1.10.2.11.DataStream API之Data Sink
?writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調(diào)用每個(gè)元素的toString()方法來獲取
?print() / printToErr():打印每個(gè)元素的toString()方法的值到標(biāo)準(zhǔn)輸出或者標(biāo)準(zhǔn)錯(cuò)誤輸出流中
?自定義輸出addSink【kafka、redis】
關(guān)于redis sink的案例:
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** 接收socket數(shù)據(jù),把數(shù)據(jù)保存到redis中** list** lpush list_key value** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoToRedis {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");//lpsuh l_words word//對(duì)數(shù)據(jù)進(jìn)行組裝,把string轉(zhuǎn)化為tuple2<String,String>DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});//創(chuàng)建redis的配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();//創(chuàng)建redissinkRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());l_wordsData.addSink(redisSink);env.execute("StreamingDemoToRedis");}public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>>{//表示從接收的數(shù)據(jù)中獲取需要操作的redis key@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}//表示從接收的數(shù)據(jù)中獲取需要操作的redis value@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}} } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/***** Created by xxxx on 2020/10/09 .*/ object StreamingDataToRedisScala {def main(args: Array[String]): Unit = {//獲取socket端口號(hào)val port = 9000//獲取運(yùn)行環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數(shù)據(jù)val text = env.socketTextStream("hadoop100",port,'\n')//注意:必須要添加這一行隱式轉(zhuǎn)行,否則下面的flatmap方法執(zhí)行會(huì)報(bào)錯(cuò)import org.apache.flink.api.scala._val l_wordsData = text.map(line=>("l_words_scala",line))val conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build()val redisSink = new RedisSink[Tuple2[String,String]](conf,new MyRedisMapper)l_wordsData.addSink(redisSink)//執(zhí)行任務(wù)env.execute("Socket window count");}class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{override def getKeyFromData(data: (String, String)) = {data._1}override def getValueFromData(data: (String, String)) = {data._2}override def getCommandDescription = {new RedisCommandDescription(RedisCommand.LPUSH)}} }總結(jié)
以上是生活随笔為你收集整理的1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 销售内业一般工资多少 还是要看具体
- 下一篇: 增仓占比是什么意思