日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(十二):流批一体API Transformation

發布時間:2023/11/28 生活经验 40 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(十二):流批一体API Transformation 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Transformation

官網API列表

基本操作-略

map

flatMap

keyBy

filter

sum

reduce

代碼演示

合并-拆分

union和connect

split、select和Side Outputs

分區

rebalance重平衡分區

其他分區


Transformation

官網API列表

Apache Flink 1.12 Documentation: Operators

整體來說,流式數據上的操作可以分為四類。

l第一類是對于單條記錄的操作,比如篩除掉不符合要求的記錄(Filter 操作),或者將每條記錄都做一個轉換(Map 操作)

l第二類是對多條記錄的操作。比如說統計一個小時內的訂單總成交量,就需要將一個小時內的所有訂單記錄的成交量加到一起。為了支持這種類型的操作,就得通過 Window 將需要的記錄關聯到一起進行處理

l第三類是對多個流進行操作并轉換為單個流。例如,多個流可以通過 Union、Join 或 Connect 等操作合到一起。這些操作合并的邏輯不同,但是它們最終都會產生了一個新的統一的流,從而可以進行一些跨流的操作。

l最后, DataStream 還支持與合并對稱的拆分操作,即把一個流按一定規則拆分為多個流(Split 操作),每個流是之前流的一個子集,這樣我們就可以對不同的流作不同的處理。

基本操作-略

map

  • API

map:將函數作用在集合中的每一個元素上,并返回作用后的結果

flatMap

  • API

flatMap:將集合中的每個元素變成一個或多個元素,并返回扁平化之后的結果

???????keyBy

按照指定的key來對流中的數據進行分組,前面入門案例中已經演示過

注意:

流處理中沒有groupBy,而是keyBy

???????filter

  • API

filter:按照指定的條件對集合中的元素進行過濾,過濾出返回true/符合條件的元素

???????sum

  • API

sum:按照指定的字段對集合中的元素進行求和

???????reduce

  • API

reduce:對集合中的元素進行聚合

???????代碼演示

  • 需求:

對流數據中的單詞進行統計,排除敏感詞TMD

  • 代碼演示
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc*/
public class TransformationDemo01 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.sourceDataStream<String> linesDS = env.socketTextStream("node1", 9999);//3.處理數據-transformationDataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {//value就是一行行的數據String[] words = value.split(" ");for (String word : words) {out.collect(word);//將切割處理的一個個的單詞收集起來并返回}}});DataStream<String> filtedDS = wordsDS.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !value.equals("heihei");}});DataStream<Tuple2<String, Integer>> wordAndOnesDS = filtedDS.map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {//value就是進來一個個的單詞return Tuple2.of(value, 1);}});//KeyedStream<Tuple2<String, Integer>, Tuple> groupedDS = wordAndOnesDS.keyBy(0);KeyedStream<Tuple2<String, Integer>, String> groupedDS = wordAndOnesDS.keyBy(t -> t.f0);DataStream<Tuple2<String, Integer>> result1 = groupedDS.sum(1);DataStream<Tuple2<String, Integer>> result2 = groupedDS.reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {return Tuple2.of(value1.f0, value1.f1 + value1.f1);}});//4.輸出結果-sinkresult1.print("result1");result2.print("result2");//5.觸發執行-executeenv.execute();}
}

???????合并-拆分

???????union和connect

  • API

union:

union算子可以合并多個同類型的數據流,并生成同類型的數據流,即可以將多個DataStream[T]合并為一個新的DataStream[T]。數據將按照先進先出(First In First Out)的模式合并,且不去重。

connect:

connect提供了和union類似的功能,用來連接兩個數據流,它與union的區別在于:

connect只能連接兩個數據流,union可以連接多個數據流。

connect所連接的兩個數據流的數據類型可以不一致,union所連接的兩個數據流的數據類型必須一致。

兩個DataStream經過connect之后被轉化為ConnectedStreams,ConnectedStreams會對兩個流的數據應用不同的處理方法,且雙流之間可以共享狀態。

  • 需求

將兩個String類型的流進行union

將一個String類型和一個Long類型的流進行connect

  • 代碼實現
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;/*** Author lanson* Desc*/
public class TransformationDemo02 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<String> ds1 = env.fromElements("hadoop", "spark", "flink");DataStream<String> ds2 = env.fromElements("hadoop", "spark", "flink");DataStream<Long> ds3 = env.fromElements(1L, 2L, 3L);//3.TransformationDataStream<String> result1 = ds1.union(ds2);//合并但不去重?https://blog.csdn.net/valada/article/details/104367378ConnectedStreams<String, Long> tempResult = ds1.connect(ds3);//interface CoMapFunction<IN1, IN2, OUT>DataStream<String> result2 = tempResult.map(new CoMapFunction<String, Long, String>() {@Overridepublic String map1(String value) throws Exception {return "String->String:" + value;}@Overridepublic String map2(Long value) throws Exception {return "Long->String:" + value.toString();}});//4.Sinkresult1.print();result2.print();//5.executeenv.execute();}
}

???????split、select和Side Outputs

  • API

Split就是將一個流分成多個流

Select就是獲取分流后對應的數據

注意:split函數已過期并移除

Side Outputs:可以使用process方法對流中數據進行處理,并針對不同的處理結果將數據收集到不同的OutputTag中

  • 需求:

對流中的數據按照奇數和偶數進行分流,并獲取分流后的數據

  • 代碼實現:
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** Author lanson* Desc*/
public class TransformationDemo03 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStreamSource<Integer> ds = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);//3.Transformation/*SplitStream<Integer> splitResult = ds.split(new OutputSelector<Integer>() {@Overridepublic Iterable<String> select(Integer value) {//value是進來的數字if (value % 2 == 0) {//偶數ArrayList<String> list = new ArrayList<>();list.add("偶數");return list;} else {//奇數ArrayList<String> list = new ArrayList<>();list.add("奇數");return list;}}});DataStream<Integer> evenResult = splitResult.select("偶數");DataStream<Integer> oddResult = splitResult.select("奇數");*///定義兩個輸出標簽OutputTag<Integer> tag_even = new OutputTag<Integer>("偶數", TypeInformation.of(Integer.class));OutputTag<Integer> tag_odd = new OutputTag<Integer>("奇數"){};//對ds中的數據進行處理SingleOutputStreamOperator<Integer> tagResult = ds.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {if (value % 2 == 0) {//偶數ctx.output(tag_even, value);} else {//奇數ctx.output(tag_odd, value);}}});//取出標記好的數據DataStream<Integer> evenResult = tagResult.getSideOutput(tag_even);DataStream<Integer> oddResult = tagResult.getSideOutput(tag_odd);//4.SinkevenResult.print("偶數");oddResult.print("奇數");//5.executeenv.execute();}
}

分區

rebalance重平衡分區

  • API

類似于Spark中的repartition,但是功能更強大,可以直接解決數據傾斜

Flink也有數據傾斜的時候,比如當前有數據量大概10億條數據需要處理,在處理過程中可能會發生如圖所示的狀況,出現了數據傾斜,其他3臺機器執行完畢也要等待機器1執行完畢后才算整體將任務完成;

所以在實際的工作中,出現這種情況比較好的解決方案就是rebalance(內部使用round robin方法將數據均勻打散)

  • 代碼演示:
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;/*** Author lanson* Desc*/
public class TransformationDemo04 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC).setParallelism(3);//2.sourceDataStream<Long> longDS = env.fromSequence(0, 100);//3.Transformation//下面的操作相當于將數據隨機分配一下,有可能出現數據傾斜DataStream<Long> filterDS = longDS.filter(new FilterFunction<Long>() {@Overridepublic boolean filter(Long num) throws Exception {return num > 10;}});//接下來使用map操作,將數據轉為(分區編號/子任務編號, 數據)//Rich表示多功能的,比MapFunction要多一些API可以供我們使用DataStream<Tuple2<Integer, Integer>> result1 = filterDS.map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {//獲取分區編號/子任務編號int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id, 1);}}).keyBy(t -> t.f0).sum(1);DataStream<Tuple2<Integer, Integer>> result2 = filterDS.rebalance().map(new RichMapFunction<Long, Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> map(Long value) throws Exception {//獲取分區編號/子任務編號int id = getRuntimeContext().getIndexOfThisSubtask();return Tuple2.of(id, 1);}}).keyBy(t -> t.f0).sum(1);//4.sink//result1.print();//有可能出現數據傾斜result2.print();//在輸出前進行了rebalance重分區平衡,解決了數據傾斜//5.executeenv.execute();}
}

???????其他分區

  • API

說明:

recale分區。基于上下游Operator的并行度,將記錄以循環的方式輸出到下游Operator的每個實例。

舉例:

上游并行度是2,下游是4,則上游一個并行度以循環的方式將記錄輸出到下游的兩個并行度上;上游另一個并行度以循環的方式將記錄輸出到下游另兩個并行度上。若上游并行度是4,下游并行度是2,則上游兩個并行度將記錄輸出到下游一個并行度上;上游另兩個并行度將記錄輸出到下游另一個并行度上。

  • 需求:

對流中的元素使用各種分區,并輸出

  • 代碼實現
package cn.it.transformation;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** Author lanson* Desc*/
public class TransformationDemo05 {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2.SourceDataStream<String> linesDS = env.readTextFile("data/input/words.txt");SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = linesDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(" ");for (String word : words) {out.collect(Tuple2.of(word, 1));}}});//3.TransformationDataStream<Tuple2<String, Integer>> result1 = tupleDS.global();DataStream<Tuple2<String, Integer>> result2 = tupleDS.broadcast();DataStream<Tuple2<String, Integer>> result3 = tupleDS.forward();DataStream<Tuple2<String, Integer>> result4 = tupleDS.shuffle();DataStream<Tuple2<String, Integer>> result5 = tupleDS.rebalance();DataStream<Tuple2<String, Integer>> result6 = tupleDS.rescale();DataStream<Tuple2<String, Integer>> result7 = tupleDS.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return key.equals("hello") ? 0 : 1;}}, t -> t.f0);//4.sink//result1.print();//result2.print();//result3.print();//result4.print();//result5.print();//result6.print();result7.print();//5.executeenv.execute();}
}

總結

以上是生活随笔為你收集整理的2021年大数据Flink(十二):流批一体API Transformation的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。