日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)

發(fā)布時(shí)間:2024/9/27 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

1.10.Flink DataStreamAPI
1.10.1.Flink API的抽象級(jí)別
1.10.2.DatSource部分詳解
1.10.2.1.DataStream API之Data Sources
1.10.2.2.DataSources API
1.10.2.3.DataStream內(nèi)置connectors
1.10.2.4.Source容錯(cuò)性保證
1.10.2.5.Sink容錯(cuò)性保證
1.10.2.6.自定義sink
1.10.2.7.Table & SQL Connectors
1.10.2.8.自定義source
1.10.2.9.DataStream API之Transformations部分詳解
1.10.2.10.DataStream API之partition
1.10.2.11.DataStream API之Data Sink

1.10.Flink DataStreamAPI

1.10.1.Flink API的抽象級(jí)別


?Flink API 最底層的抽象為有狀態(tài)實(shí)時(shí)流處理。其抽象實(shí)現(xiàn)是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中來為我們使用。它允許用戶在應(yīng)用程序中自由地處理來自單流或多流的事件(數(shù)據(jù)),并提供具有全局一致性和容錯(cuò)保障的狀態(tài)。此外,用戶可以在此層抽象中注冊(cè)事件時(shí)間(event time)和處理時(shí)間(processing time)回調(diào)方法,從而允許程序可以實(shí)現(xiàn)復(fù)雜計(jì)算。

?Flink API 第二層抽象是 Core APIs。實(shí)際上,許多應(yīng)用程序不需要使用到上述最底層抽象的 API,而是可以使用 Core APIs 進(jìn)行編程:其中包含 DataStream API(應(yīng)用于有界/無界數(shù)據(jù)流場(chǎng)景)和 DataSet API(應(yīng)用于有界數(shù)據(jù)集場(chǎng)景)兩部分。Core APIs 提供的流式 API(Fluent API)為數(shù)據(jù)處理提供了通用的模塊組件,例如各種形式的用戶自定義轉(zhuǎn)換(transformations)、聯(lián)接(joins)、聚合(aggregations)、窗口(windows)和狀態(tài)(state)操作等。此層 API 中處理的數(shù)據(jù)類型在每種編程語言中都有其對(duì)應(yīng)的類。

Process Function 這類底層抽象和 DataStream API 的相互集成使得用戶可以選擇使用更底層的抽象 API 來實(shí)現(xiàn)自己的需求。DataSet API 還額外提供了一些原語,比如循環(huán)/迭代(loop/iteration)操作。

?Flink API 第三層抽象是 Table API。Table API 是以表(Table)為中心的聲明式編程(DSL)API,例如在流式數(shù)據(jù)場(chǎng)景下,它可以表示一張正在動(dòng)態(tài)改變的表。Table API 遵循(擴(kuò)展)關(guān)系模型:即表擁有 schema(類似于關(guān)系型數(shù)據(jù)庫中的 schema),并且 Table API 也提供了類似于關(guān)系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。Table API 程序是以聲明的方式定義應(yīng)執(zhí)行的邏輯操作,而不是確切地指定程序應(yīng)該執(zhí)行的代碼。盡管 Table API 使用起來很簡潔并且可以由各種類型的用戶自定義函數(shù)擴(kuò)展功能,但還是比 Core API 的表達(dá)能力差。此外,Table API 程序在執(zhí)行之前還會(huì)使用優(yōu)化器中的優(yōu)化規(guī)則對(duì)用戶編寫的表達(dá)式進(jìn)行優(yōu)化。

表和 DataStream/DataSet 可以進(jìn)行無縫切換,Flink 允許用戶在編寫應(yīng)用程序時(shí)將 Table API 與 DataStream/DataSet API 混合使用。

?Flink API 最頂層抽象是 SQL。這層抽象在語義和程序表達(dá)式上都類似于 Table API,但是其程序?qū)崿F(xiàn)都是 SQL 查詢表達(dá)式。SQL 抽象與 Table API 抽象之間的關(guān)聯(lián)是非常緊密的,并且 SQL 查詢語句可以在 Table API 中定義的表上執(zhí)行。

本技術(shù)文檔上案例所需的pom.xml如下:

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>xxxxx.demo</groupId><artifactId>flink-demo</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--maven properties --><maven.test.skip>false</maven.test.skip><maven.javadoc.skip>false</maven.javadoc.skip><!-- compiler settings properties --><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><rocketmq.version>4.7.1</rocketmq.version><flink.version>1.11.1</flink.version><commons-lang.version>2.5</commons-lang.version><scala.binary.version>2.12</scala.binary.version></properties><distributionManagement><repository><id>releases</id><layout>default</layout><url>http://ip/nexus/content/repositories/releases/</url></repository><snapshotRepository><id>snapshots</id><name>snapshots</name><url>http://ip/nexus/content/repositories/snapshots/</url></snapshotRepository></distributionManagement><repositories><repository><id>releases</id><layout>default</layout><url>http://ip/nexus/content/repositories/releases/</url></repository><repository><id>snapshots</id><name>snapshots</name><url>http://ip/nexus/content/repositories/snapshots/</url><snapshots><enabled>true</enabled><updatePolicy>always</updatePolicy><checksumPolicy>warn</checksumPolicy></snapshots></repository><repository><id>tianque</id><name>tianque</name><url>http://ip/nexus/content/repositories/tianque/</url></repository><repository><id>public</id><name>public</name><url>http://ip/nexus/content/groups/public/</url></repository><!-- 新加 --><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><!--1.compile : 默認(rèn)的scope,運(yùn)行期有效,需要打入包中。2.provided : 編譯器有效,運(yùn)行期不需要提供,不會(huì)打入包中。3.runtime : 編譯不需要,在運(yùn)行期有效,需要導(dǎo)入包中。(接口與實(shí)現(xiàn)分離)4.test : 測(cè)試需要,不會(huì)打入包中5.system : 非本地倉庫引入、存在系統(tǒng)的某個(gè)路徑下的jar。(一般不使用)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.tianque.doraemon</groupId><artifactId>issue-business-api</artifactId><version>1.0.6.RELEASE</version></dependency><!-- 使用scala編程的時(shí)候使用下面的依賴 start--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- 使用scala編程的時(shí)候使用下面的依賴 end--><!-- kafka connector scala 2.12 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>test</scope></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>${rocketmq.version}</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-common</artifactId><version>${rocketmq.version}</version><exclusions><exclusion><groupId>io.netty</groupId><artifactId>netty-tcnative</artifactId></exclusion></exclusions></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>${commons-lang.version}</version></dependency><!--test --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope><version>4.12</version></dependency><dependency><groupId>org.powermock</groupId><artifactId>powermock-module-junit4</artifactId><version>1.5.5</version><scope>test</scope></dependency><dependency><groupId>org.powermock</groupId><artifactId>powermock-api-mockito</artifactId><version>1.5.5</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-namesrv</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-broker</artifactId><version>${rocketmq.version}</version><scope>test</scope></dependency><dependency><groupId>com.tianque</groupId><artifactId>caterpillar-sdk</artifactId><version>0.1.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.73</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>3.3.0</version></dependency></dependencies><build><plugins><!-- 編譯插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>${maven.compiler.source}</source><target>${maven.compiler.target}</target><encoding>UTF-8</encoding><compilerVersion>${maven.compiler.source}</compilerVersion><showDeprecation>true</showDeprecation><showWarnings>true</showWarnings></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.12.4</version><configuration><skipTests>${maven.test.skip}</skipTests></configuration></plugin><plugin><groupId>org.apache.rat</groupId><artifactId>apache-rat-plugin</artifactId><version>0.12</version><configuration><excludes><exclude>README.md</exclude></excludes></configuration></plugin><plugin><artifactId>maven-checkstyle-plugin</artifactId><version>2.17</version><executions><execution><id>verify</id><phase>verify</phase><configuration><configLocation>style/rmq_checkstyle.xml</configLocation><encoding>UTF-8</encoding><consoleOutput>true</consoleOutput><failsOnError>true</failsOnError><includeTestSourceDirectory>false</includeTestSourceDirectory><includeTestResources>false</includeTestResources></configuration><goals><goal>check</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-javadoc-plugin</artifactId><version>2.10.4</version><configuration><aggregate>true</aggregate><reportOutputDirectory>javadocs</reportOutputDirectory><locale>en</locale></configuration></plugin><!-- scala編譯插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.1.6</version><configuration><scalaCompatVersion>2.11</scalaCompatVersion><scalaVersion>2.11.12</scalaVersion><encoding>UTF-8</encoding></configuration><executions><execution><id>compile-scala</id><phase>compile</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>test-compile-scala</id><phase>test-compile</phase><goals><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- 打jar包插件(會(huì)包含所有依賴) --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>2.6</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!-- 可以設(shè)置jar包的入口類(可選),此處根據(jù)自己項(xiàng)目的情況進(jìn)行修改 --><mainClass>xxxxx.SocketWindowWordCountJava</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>

1.10.2.DatSource部分詳解

1.10.2.1.DataStream API之Data Sources

?source是程序的數(shù)據(jù)源輸入,你可以通過StreamExecutionEnvironment.addSource(sourceFunction)來為你的程序添加一個(gè)source。

?Flink提供了大量的已經(jīng)實(shí)現(xiàn)好的source方法,你也可以自定義source

  • ?通過實(shí)現(xiàn)sourceFunction接口來自定義無并行度的source
  • ?或者你也可以通過實(shí)現(xiàn)ParallelSourceFunction接口or繼承RichParallelSourceFunction來自定義有并行度的source。

以下是自定義Source相關(guān)的內(nèi)容

import org.apache.flink.streaming.api.functions.source.SourceFunction;/*** 自定義實(shí)現(xiàn)并行度為1的source** 模擬產(chǎn)生從1開始的遞增數(shù)字*** 注意:* SourceFunction 和 SourceContext 都需要指定數(shù)據(jù)類型,如果不指定,代碼運(yùn)行的時(shí)候會(huì)報(bào)錯(cuò)* Caused by: org.apache.flink.api.common.functions.InvalidTypesException:* The types of the interface org.apache.flink.streaming.api.functions.source.SourceFunction could not be inferred.* Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyNoParalleSource implements SourceFunction<Long>{private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動(dòng)一個(gè)source* 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產(chǎn)生一條數(shù)據(jù)Thread.sleep(1000);}}/*** 取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法**/@Overridepublic void cancel() {isRunning = false;} }

scala代碼:

import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyNoParallelSourceScala extends SourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;/*** 自定義實(shí)現(xiàn)一個(gè)支持并行度的source* Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyParalleSource implements ParallelSourceFunction<Long> {private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動(dòng)一個(gè)source* 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產(chǎn)生一條數(shù)據(jù)Thread.sleep(1000);}}/*** 取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法**/@Overridepublic void cancel() {isRunning = false;} } ```java import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;/*** 自定義實(shí)現(xiàn)一個(gè)支持并行度的source** RichParallelSourceFunction 會(huì)額外提供open和close方法* 針對(duì)source中如果需要獲取其他鏈接資源,那么可以在open方法中獲取資源鏈接,在close中關(guān)閉資源鏈接** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class MyRichParalleSource extends RichParallelSourceFunction<Long> {private long count = 1L;private boolean isRunning = true;/*** 主要的方法* 啟動(dòng)一個(gè)source* 大部分情況下,都需要在這個(gè)run方法中實(shí)現(xiàn)一個(gè)循環(huán),這樣就可以循環(huán)產(chǎn)生數(shù)據(jù)了** @param ctx* @throws Exception*/@Overridepublic void run(SourceContext<Long> ctx) throws Exception {while(isRunning){ctx.collect(count);count++;//每秒產(chǎn)生一條數(shù)據(jù)Thread.sleep(1000);}}/*** 取消一個(gè)cancel的時(shí)候會(huì)調(diào)用的方法**/@Overridepublic void cancel() {isRunning = false;}/*** 這個(gè)方法只會(huì)在最開始的時(shí)候被調(diào)用一次* 實(shí)現(xiàn)獲取鏈接的代碼* @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("open.............");super.open(parameters);}/*** 實(shí)現(xiàn)關(guān)閉鏈接的代碼* @throws Exception*/@Overridepublic void close() throws Exception {super.close(); }}

使用自己定義的source

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用并行度為1的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyNoPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//注意:針對(duì)此source,并行度只能設(shè)置為1DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyNoPralalleSource.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用多并行度的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源DataStreamSource<Long> text = env.addSource(new MyParalleSource()).setParallelism(2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyPralalleSource.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** 使用多并行度的source** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyRichPralalleSource {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源DataStreamSource<Long> text = env.addSource(new MyRichParalleSource()).setParallelism(2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyRichPralalleSource.class.getSimpleName();env.execute(jobName);} }

以下是scala的實(shí)現(xiàn)

import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyNoParallelSourceScala extends SourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyParallelSourceScala extends ParallelSourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext/*** 創(chuàng)建自定義并行度為1的source** 實(shí)現(xiàn)從1開始產(chǎn)生遞增數(shù)字** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyRichParallelSourceScala extends RichParallelSourceFunction[Long]{var count = 1Lvar isRunning = trueoverride def run(ctx: SourceContext[Long]) = {while(isRunning){ctx.collect(count)count+=1Thread.sleep(1000)}}override def cancel() = {isRunning = false}override def open(parameters: Configuration): Unit = super.open(parameters)override def close(): Unit = super.close() } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyNoParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)val mapData = text.map(line=>{println("接收到的數(shù)據(jù):"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyParallelSourceScala).setParallelism(2)val mapData = text.map(line=>{println("接收到的數(shù)據(jù):"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoWithMyRichParallelSourceScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyRichParallelSourceScala).setParallelism(2)val mapData = text.map(line=>{println("接收到的數(shù)據(jù):"+line)line})val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}

1.10.2.2.DataSources API

?基于文件

  • ?readTextFile(path)
  • ?讀取文本文件,文件遵循TextInputFormat讀取規(guī)則,逐行讀取并返回。
    ?基于socket
  • ?從socket中讀取數(shù)據(jù),元素可以通過一個(gè)分隔符切開。
    ?基于集合
  • ?fromCollection(Collection)
  • ?通過java 的collection集合創(chuàng)建一個(gè)數(shù)據(jù)流,集合中的所有元素必須是相同類型的。
    ?自定義輸入
  • ?addSource 可以實(shí)現(xiàn)讀取第三方數(shù)據(jù)源的數(shù)據(jù)
  • ?系統(tǒng)內(nèi)置提供了一批connectors,連接器會(huì)提供對(duì)應(yīng)的source支持【kafka】

基于集合的案例fromCollection(Collection):

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** @author tuzuoquan* @version 1.0* @ClassName StreamingFromCollection* @description TODO* @date 2020/9/16 13:49**/ public class StreamingFromCollection {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Integer> data = new ArrayList<>();data.add(10);data.add(15);data.add(20);//指定數(shù)據(jù)源DataStreamSource<Integer> collectionData = env.fromCollection(data);//通map對(duì)數(shù)據(jù)進(jìn)行處理 // DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() { // @Override // public Integer map(Integer value) throws Exception { // return value + 1; // } // });//通map對(duì)數(shù)據(jù)進(jìn)行處理//DataStream<String>中的String為最終的系統(tǒng)中返回值//new MapFunction<Integer, String>中的String和返回值的類型保值一致//public String map(Integer value)中的String就是返回值中的類型DataStream<String> num = collectionData.map(new MapFunction<Integer, String>() {@Overridepublic String map(Integer value) throws Exception {return value + 1 + "_suffix";}});//直接打印num.print().setParallelism(1);env.execute("StreamingFromCollection");}}

1.10.2.3.DataStream內(nèi)置connectors

一些比較基本的 Source 和 Sink 已經(jīng)內(nèi)置在 Flink 里。 預(yù)定義 data sources 支持從文件、目錄、socket,以及 collections 和 iterators 中讀取數(shù)據(jù)。 預(yù)定義 data sinks 支持把數(shù)據(jù)寫入文件、標(biāo)準(zhǔn)輸出(stdout)、標(biāo)準(zhǔn)錯(cuò)誤輸出(stderr)和socket。
?Apache Kafka (source/sink)
?Apache Cassandra (sink)
?Amazon Kinesis Streams (source/sink)
?Elasticsearch (sink)
?Hadoop FileSystem (sink)
?RabbitMQ (source/sink)
?Apache NiFi (source/sink)
?Twitter Streaming API (source)
?Google PubSub (source/sink)
?JDBC (sink)

在使用一種連接器時(shí),通常需要額外的第三方組件,比如:數(shù)據(jù)存儲(chǔ)服務(wù)器或者消費(fèi)隊(duì)列。要注意這些列舉的連接器是Flink工程的一部分,包含在發(fā)布的源碼中,但是不包含在二進(jìn)制發(fā)行版本中。

1.10.2.4.Source容錯(cuò)性保證

SourceGuaranteesNotes
Apache Kafka精確一次根據(jù)你的版本恰當(dāng)?shù)腒afka連接器
AWS Kinesis Streams精確一次
RabbitMQ至多一次 (v 0.10) / 精確一次 (v 1.0)
Twitter Streaming API至多一次
Google PubSub至少一次
Collections精確一次
Files精確一次
Sockets至多一次

為了保證端到端精確一次的數(shù)據(jù)交付(在精確一次的狀態(tài)語義上更進(jìn)一步),sink需要參與checkpointing。下表列舉了Flink與其自帶Sink的交付保證(假設(shè)精確一次狀態(tài)更新)

1.10.2.5.Sink容錯(cuò)性保證

SinkGuaranteesNotes
HDFS BucketingSink精確一次實(shí)現(xiàn)方法取決于 Hadoop 的版本
Elasticsearch至少一次
Kafka producer至少一次/精確一次當(dāng)使用事務(wù)生產(chǎn)者時(shí),保證精確一次 (v 0.11+)
Cassandra sink至少一次 / 精確一次
AWS Kinesis Streams至少一次
File sinks精確一次
Socket sinks至少一次
Standard output至少一次
Redis sink至少一次

1.10.2.6.自定義sink

?實(shí)現(xiàn)自定義的sink

  • ?實(shí)現(xiàn)SinkFunction接口
  • ?或者繼承RichSinkFunction

1.10.2.7.Table & SQL Connectors

  • ?Formats
  • ?Kafka
  • ?JDBC
  • ?Elasticsearch
  • ?FileSystem
  • ?HBASE
  • ?DataGen
  • ?Print
  • ?BlackHole

1.10.2.8.自定義source

?實(shí)現(xiàn)并行度為1的自定義source

  • ?實(shí)現(xiàn)SourceFunction
  • ?一般不需要實(shí)現(xiàn)容錯(cuò)性保證
  • ?處理好cancel方法(cancel應(yīng)用的時(shí)候,這個(gè)方法會(huì)被調(diào)用)
    ?實(shí)現(xiàn)并行化的自定義source
  • ?實(shí)現(xiàn)ParallelSourceFunction
  • ?或者繼承RichParallelSourceFunction

繼承RichParallelSourceFunction的那些SourceFunction意味著它們都是并行執(zhí)行的并且可能有一些資源需要open/close

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/*** broadcast分區(qū)規(guī)則** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoWithMyNoPralalleSourceBroadcast {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4);//獲取數(shù)據(jù)源//注意:針對(duì)此source,并行度只能設(shè)置為1DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);DataStream<Long> num = text.broadcast().map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {long id = Thread.currentThread().getId();System.out.println("線程id:"+id+",接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoWithMyNoPralalleSourceBroadcast.class.getSimpleName();env.execute(jobName);} }

1.10.2.9.DataStream API之Transformations部分詳解

  • ?map:輸入一個(gè)元素,然后返回一個(gè)元素,中間可以做一些清洗轉(zhuǎn)換等操作
  • ?flatmap:輸入一個(gè)元素,可以返回零個(gè),一個(gè)或者多個(gè)元素
  • ?filter:過濾函數(shù),對(duì)傳入的數(shù)據(jù)進(jìn)行判斷,符合條件的數(shù)據(jù)會(huì)被留下
import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/*** Filter演示** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoFilter {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源//注意:針對(duì)此source,并行度只能設(shè)置為1DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("原始接收到數(shù)據(jù):" + value);return value;}});//執(zhí)行filter過濾,滿足條件的數(shù)據(jù)會(huì)被留下DataStream<Long> filterData = num.filter(new FilterFunction<Long>() {//把所有的奇數(shù)過濾掉@Overridepublic boolean filter(Long value) throws Exception {return value % 2 == 0;}});DataStream<Long> resultData = filterData.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("過濾之后的數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = resultData.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoFilter.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import xxx.xxx.streaming.custormSource.MyNoParallelSourceScala/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoFilterScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)val mapData = text.map(line=>{println("原始接收到的數(shù)據(jù):"+line)line}).filter(_ % 2 == 0)val sum = mapData.map(line=>{println("過濾之后的數(shù)據(jù):"+line)line}).timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}
  • ?keyBy:根據(jù)指定的key進(jìn)行分組,相同key的數(shù)據(jù)會(huì)進(jìn)入同一個(gè)分區(qū)【典型用法見備注】
  • ?reduce:對(duì)數(shù)據(jù)進(jìn)行聚合操作,結(jié)合當(dāng)前元素和上一次reduce返回的值進(jìn)行聚合操作,然后返回一個(gè)新的值
  • ?aggregations:sum(),min(),max()等
  • ?window:在后面單獨(dú)詳解
  • ?Union:合并多個(gè)流,新的流會(huì)包含所有流中的數(shù)據(jù),但是union是一個(gè)限制,就是所有合并的流類型必須是一致的。
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/*** union* 合并多個(gè)流,新的流會(huì)包含所有流中的數(shù)據(jù),但是union是一個(gè)限制,就是所有合并的流類型必須是一致的** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoUnion {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源//注意:針對(duì)此source,并行度只能設(shè)置為1DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);//把text1和text2組裝到一起DataStream<Long> text = text1.union(text2);DataStream<Long> num = text.map(new MapFunction<Long, Long>() {@Overridepublic Long map(Long value) throws Exception {System.out.println("原始接收到數(shù)據(jù):" + value);return value;}});//每2秒鐘處理一次數(shù)據(jù)DataStream<Long> sum = num.timeWindowAll(Time.seconds(2)).sum(0);//打印結(jié)果sum.print().setParallelism(1);String jobName = StreamingDemoUnion.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time import xxx.xxx.streaming.custormSource.MyNoParallelSourceScala/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoUnionScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text1 = env.addSource(new MyNoParallelSourceScala)val text2 = env.addSource(new MyNoParallelSourceScala)val unionall = text1.union(text2)val sum = unionall.map(line=>{println("接收到的數(shù)據(jù):"+line)line}).timeWindowAll(Time.seconds(2)).sum(0)sum.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}
  • ?Connect:和union類似,但是只能連接兩個(gè)流,兩個(gè)流的數(shù)據(jù)類型可以不同,會(huì)對(duì)兩個(gè)流中的數(shù)據(jù)應(yīng)用不同的處理方法。
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.ConnectedStreams; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/*** connect* 和union類似,但是只能連接兩個(gè)流,兩個(gè)流的數(shù)據(jù)類型可以不同,會(huì)對(duì)兩個(gè)流中的數(shù)據(jù)應(yīng)用不同的處理方法** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoConnect {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源//注意:針對(duì)此source,并行度只能設(shè)置為1DataStreamSource<Long> text1 = env.addSource(new MyNoParalleSource()).setParallelism(1);DataStreamSource<Long> text2 = env.addSource(new MyNoParalleSource()).setParallelism(1);SingleOutputStreamOperator<String> text2_str = text2.map(new MapFunction<Long, String>() {@Overridepublic String map(Long value) throws Exception {return "str_" + value;}});ConnectedStreams<Long, String> connectStream = text1.connect(text2_str);SingleOutputStreamOperator<Object> result = connectStream.map(new CoMapFunction<Long, String, Object>() {@Overridepublic Object map1(Long value) throws Exception {return value;}@Overridepublic Object map2(String value) throws Exception {return value;}});//打印結(jié)果result.print().setParallelism(1);String jobName = StreamingDemoConnect.class.getSimpleName();env.execute(jobName);} } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import xxx.xxx.streaming.custormSource.MyNoParallelSourceScala/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoConnectScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text1 = env.addSource(new MyNoParallelSourceScala)val text2 = env.addSource(new MyNoParallelSourceScala)val text2_str = text2.map("str" + _)val connectedStreams = text1.connect(text2_str)val result = connectedStreams.map(line1=>{line1},line2=>{line2})result.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}
  • ?CoMap, CoFlatMap:在ConnectedStreams中需要使用這種函數(shù),類似于map和flatmap
  • ?Split:根據(jù)規(guī)則把一個(gè)數(shù)據(jù)流切分為多個(gè)流
import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import xxx.xxx.streaming.custormSource.MyNoParalleSource;import java.util.ArrayList;/*** split** 根據(jù)規(guī)則把一個(gè)數(shù)據(jù)流切分為多個(gè)流** 應(yīng)用場(chǎng)景:* 可能在實(shí)際工作中,源數(shù)據(jù)流中混合了多種類似的數(shù)據(jù),多種類型的數(shù)據(jù)處理規(guī)則不一樣,所以就可以在根據(jù)一定的規(guī)則,* 把一個(gè)數(shù)據(jù)流切分成多個(gè)數(shù)據(jù)流,這樣每個(gè)數(shù)據(jù)流就可以使用不用的處理邏輯了** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoSplit {public static void main(String[] args) throws Exception {//獲取Flink的運(yùn)行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//獲取數(shù)據(jù)源//注意:針對(duì)此source,并行度只能設(shè)置為1DataStreamSource<Long> text = env.addSource(new MyNoParalleSource()).setParallelism(1);//對(duì)流進(jìn)行切分,按照數(shù)據(jù)的奇偶性進(jìn)行區(qū)分SplitStream<Long> splitStream = text.split(new OutputSelector<Long>() {@Overridepublic Iterable<String> select(Long value) {ArrayList<String> outPut = new ArrayList<>();if (value % 2 == 0) {outPut.add("even");//偶數(shù)} else {outPut.add("odd");//奇數(shù)}return outPut;}});//選擇一個(gè)或者多個(gè)切分后的流DataStream<Long> evenStream = splitStream.select("even");DataStream<Long> oddStream = splitStream.select("odd");DataStream<Long> moreStream = splitStream.select("odd","even");//打印結(jié)果moreStream.print().setParallelism(1);String jobName = StreamingDemoSplit.class.getSimpleName();env.execute(jobName);} } import java.utilimport org.apache.flink.streaming.api.collector.selector.OutputSelector import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import xxx.xxx.streaming.custormSource.MyNoParallelSourceScala/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoSplitScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)val splitStream = text.split(new OutputSelector[Long] {override def select(value: Long) = {val list = new util.ArrayList[String]()if(value%2 == 0){list.add("even")// 偶數(shù)}else{list.add("odd")// 奇數(shù)}list}})val evenStream = splitStream.select("even")evenStream.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}
  • ?Select:和split配合使用,選擇切分后的流

兩種典型用法:
dataStream.keyBy(“someKey”) // 指定對(duì)象中的 "someKey"字段作為分組key
dataStream.keyBy(0) //指定Tuple中的第一個(gè)元素作為分組key

注意:以下類型是無法作為key的

  • 1:一個(gè)實(shí)體類對(duì)象,沒有重寫hashCode方法,并且依賴object的hashCode方法
  • 2:一個(gè)任意形式的數(shù)組類型
  • 3:基本數(shù)據(jù)類型,int,long

1.10.2.10.DataStream API之partition

?Random partitioning:隨機(jī)分區(qū)

  • ?dataStream.shuffle()
    ?Rebalancing:對(duì)數(shù)據(jù)集進(jìn)行再平衡,重分區(qū),消除數(shù)據(jù)傾斜
  • ?dataStream.rebalance()
    ?Rescaling:解釋見備注
  • ?dataStream.rescale()
    ?Custom partitioning:自定義分區(qū)
  • ?自定義分區(qū)需要實(shí)現(xiàn)Partitioner接口
  • ?dataStream.partitionCustom(partitioner, “someKey”)
  • ?或者dataStream.partitionCustom(partitioner, 0);
    ?Broadcasting:在后面單獨(dú)詳解

Rescaling解釋:
舉個(gè)例子:
如果上游操作有2個(gè)并發(fā),而下游操作有4個(gè)并發(fā),那么上游的一個(gè)并發(fā)結(jié)果分配給下游的兩個(gè)并發(fā)操作,另外的一個(gè)并發(fā)結(jié)果分配給了下游的另外兩個(gè)并發(fā)操作.另一方面,下游有兩個(gè)并發(fā)操作而上游又4個(gè)并發(fā)操作,那么上游的其中兩個(gè)操作的結(jié)果分配給下游的一個(gè)并發(fā)操作而另外兩個(gè)并發(fā)操作的結(jié)果則分配給另外一個(gè)并發(fā)操作。

Rescaling與Rebalancing的區(qū)別:
Rebalancing會(huì)產(chǎn)生全量重分區(qū),而Rescaling不會(huì)。

自定義分區(qū)案例:

import org.apache.flink.api.common.functions.Partitioner;/*** Created by xxxx on 2020/10/09*/ public class MyPartition implements Partitioner<Long> {@Overridepublic int partition(Long key, int numPartitions) {System.out.println("分區(qū)總數(shù):"+numPartitions);if(key % 2 == 0){return 0;}else{return 1;}} } import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import xxx.xxx.streaming.custormSource.MyNoParalleSource;/**** 使用自定義分析* 根據(jù)數(shù)字的奇偶性來分區(qū)** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class SteamingDemoWithMyParitition {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());//對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換,把long類型轉(zhuǎn)成tuple1類型DataStream<Tuple1<Long>> tupleData = text.map(new MapFunction<Long, Tuple1<Long>>() {@Overridepublic Tuple1<Long> map(Long value) throws Exception {return new Tuple1<>(value);}});//分區(qū)之后的數(shù)據(jù)DataStream<Tuple1<Long>> partitionData = tupleData.partitionCustom(new MyPartition(), 0);DataStream<Long> result = partitionData.map(new MapFunction<Tuple1<Long>, Long>() {@Overridepublic Long map(Tuple1<Long> value) throws Exception {System.out.println("當(dāng)前線程id:" + Thread.currentThread().getId() + ",value: " + value);return value.getField(0);}});result.print().setParallelism(1);env.execute("SteamingDemoWithMyParitition");} }

scala案例:

import org.apache.flink.api.common.functions.Partitioner/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ class MyPartitionerScala extends Partitioner[Long]{override def partition(key: Long, numPartitions: Int) = {println("分區(qū)總數(shù):"+numPartitions)if(key % 2 ==0){0}else{1}}} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import xxx.xxx.streaming.custormSource.MyNoParallelSourceScala/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingDemoMyPartitionerScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)//隱式轉(zhuǎn)換import org.apache.flink.api.scala._val text = env.addSource(new MyNoParallelSourceScala)//把long類型的數(shù)據(jù)轉(zhuǎn)成tuple類型val tupleData = text.map(line=>{Tuple1(line)// 注意tuple1的實(shí)現(xiàn)方式})val partitionData = tupleData.partitionCustom(new MyPartitionerScala,0)val result = partitionData.map(line=>{println("當(dāng)前線程id:"+Thread.currentThread().getId+",value: "+line)line._1})result.print().setParallelism(1)env.execute("StreamingDemoWithMyNoParallelSourceScala")}}

1.10.2.11.DataStream API之Data Sink

?writeAsText():將元素以字符串形式逐行寫入,這些字符串通過調(diào)用每個(gè)元素的toString()方法來獲取
?print() / printToErr():打印每個(gè)元素的toString()方法的值到標(biāo)準(zhǔn)輸出或者標(biāo)準(zhǔn)錯(cuò)誤輸出流中
?自定義輸出addSink【kafka、redis】

關(guān)于redis sink的案例:

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;/*** 接收socket數(shù)據(jù),把數(shù)據(jù)保存到redis中** list** lpush list_key value** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingDemoToRedis {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> text = env.socketTextStream("hadoop100", 9000, "\n");//lpsuh l_words word//對(duì)數(shù)據(jù)進(jìn)行組裝,把string轉(zhuǎn)化為tuple2<String,String>DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return new Tuple2<>("l_words", value);}});//創(chuàng)建redis的配置FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();//創(chuàng)建redissinkRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());l_wordsData.addSink(redisSink);env.execute("StreamingDemoToRedis");}public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>>{//表示從接收的數(shù)據(jù)中獲取需要操作的redis key@Overridepublic String getKeyFromData(Tuple2<String, String> data) {return data.f0;}//表示從接收的數(shù)據(jù)中獲取需要操作的redis value@Overridepublic String getValueFromData(Tuple2<String, String> data) {return data.f1;}@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}} } import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.redis.RedisSink import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}/***** Created by xxxx on 2020/10/09 .*/ object StreamingDataToRedisScala {def main(args: Array[String]): Unit = {//獲取socket端口號(hào)val port = 9000//獲取運(yùn)行環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數(shù)據(jù)val text = env.socketTextStream("hadoop100",port,'\n')//注意:必須要添加這一行隱式轉(zhuǎn)行,否則下面的flatmap方法執(zhí)行會(huì)報(bào)錯(cuò)import org.apache.flink.api.scala._val l_wordsData = text.map(line=>("l_words_scala",line))val conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build()val redisSink = new RedisSink[Tuple2[String,String]](conf,new MyRedisMapper)l_wordsData.addSink(redisSink)//執(zhí)行任務(wù)env.execute("Socket window count");}class MyRedisMapper extends RedisMapper[Tuple2[String,String]]{override def getKeyFromData(data: (String, String)) = {data._1}override def getValueFromData(data: (String, String)) = {data._2}override def getCommandDescription = {new RedisCommandDescription(RedisCommand.LPUSH)}} }

總結(jié)

以上是生活随笔為你收集整理的1.10.Flink DataStreamAPI(API的抽象级别、Data Sources、connectors、Source容错性保证、Sink容错性保证、自定义sink、partition等)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 岛国av一区| 成人做爰黄| 国产免费专区 | 奇米在线777 | 欧美揉bbbbb揉bbbbb | 免费av网址在线观看 | 精品九九久久 | 久久大奶| 欧美国产综合视频 | 国产乱视频 | 亚洲狠狠丁香婷婷综合久久久 | 日韩精品人妻一区二区中文字幕 | 国产一区视频在线免费观看 | 91禁在线观看 | 国产福利视频网站 | 国产毛片在线 | 国产精品久久久久久久一区探花 | 色肉色伦交av色肉色伦 | 天天干天天玩 | 枫花恋在线观看 | 人妻与黑人一区二区三区 | 激情av一区 | 久久永久视频 | 亚洲区 欧美区 | 久久免费毛片 | 国产鲁鲁视频在线观看特色 | 国产美女福利视频 | 午夜视频在线观看一区二区 | 老师的肉丝玉足夹茎 | 国产91嫩草 | 野花社区视频在线观看 | 四色成人av永久网址 | 国产精品资源在线观看 | 99re这里只有精品在线 | 国产探花在线精品一区二区 | 午夜精品成人毛片非洲 | 一级性生活毛片 | 香蕉视频亚洲一级 | 超碰免费公开在线 | 亚洲天堂一区在线观看 | 欧美激情一区二区三区免费观看 | 欧美黑人三级 | 自拍av在线 | 日本最黄网站 | 国产精品99一区二区三区 | 久久精品久久久久久久 | 999在线视频 | 日韩 欧美 国产 综合 | www国产一区| 天堂视频免费 | jzzijzzij日本成熟少妇 | 91av日本 | 久久99精品久久久久 | 羞羞的视频网站 | 亚洲国产麻豆 | 午夜免费看视频 | 4444亚洲人成无码网在线观看 | 色欧美片视频在线观看 | 激情中文字幕 | 白丝一区| 亚洲高潮无码久久 | 国产一区二区三区免费看 | 欧美成人精品一区二区三区在线看 | 秋霞av鲁丝片一区二区 | a天堂在线观看视频 | 国产福利片在线观看 | 明日叶三叶 | 国产在线xx | 免费看黄色的网站 | 波多野结衣中文在线 | 91一区在线 | 在线免费中文字幕 | 欧美激情视频在线观看 | 97人妻精品一区二区三区动漫 | 青苹果av | 日本人视频69式jzzij | 国产成人av无码精品 | 女人扒开双腿让男人捅 | 日本久久网 | 国产超碰av | 日本一级免费视频 | 亚洲校园激情 | 亚洲另类天堂 | 艳母日本动漫在线观看 | 欧美色图一区 | 久色在线 | 丁香婷婷激情五月 | 伊人国产在线 | 涩涩在线播放 | 久草福利在线 | 欧美毛片在线 | 国产ts丝袜人妖系列视频 | 国产日产欧美一区二区 | 91精品国自产在线观看 | www在线视频 | 中国少妇色 | 欧美第一页草草影院 | 男男大尺度 | av综合站|