日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

FlinkAPI_Environment_输入源_算子转化流程

發布時間:2024/7/5 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 FlinkAPI_Environment_输入源_算子转化流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Flink Environment

  • getExecutionEnvironment()

    根據當前平臺, 獲取對應的執行環境, 若未設置并行度, 使用 flink-conf.yaml 中的并行度配置, 默認 1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  • createLocalEnviroment()

    創建本地環境, 并行度默認為 CPU 核數, 也可在構造函數中傳參設置 LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
  • createRemoteEnviroment()

    創建遠程環境, 將 jar 提交到遠程環境執行 StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 7777, "/home/WordCount.jar");
  • Flink 輸入源

  • 使用集合數據作為輸入源env.fromCollection(new ArrayList<>()); env.fromElements(1, 2, 3);
  • 使用文件作為輸入源env.readTextFile("/home/test.txt");
  • 使用消息隊列作為輸入源
    如下, 使用 Kafka 作為輸入源引入連接器依賴: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version> </dependency>env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
  • 用戶自定義輸入源(實現 SourceFunction 接口)
    主要用于測試, 定義假數據.
  • 具體實操代碼如下:

    import com.regotto.entity.SensorReading; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Arrays; import java.util.Properties; import java.util.Random;/*** @author regotto*/ public class SourceTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();private static void readFromCollectionAndElement() {/*從集合中讀取, SensorReading 自定義實體(String id, Long timestamp, Double temperature)*/DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(new SensorReading("1", 1111L, 35.1),new SensorReading("2", 2222L, 32.1),new SensorReading("3", 3333L, 33.1),new SensorReading("4", 12345L, 36.1)));DataStreamSource<Integer> elements = env.fromElements(1, 2, 3, 4, 5);dataStream.print("data");elements.print("int");}private static void readFromText() {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");dataStream.print();}private static void readFromKafka() {Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9999");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));dataStream.print();}/*** 用戶自定義輸入源*/private static void readFromUserDefine() {// 實現 SourceFunction 接口, run 方法中定義數據并使用 collect 資源輸出DataStream<SensorReading> dataStream = env.addSource(new SourceFunction<SensorReading>() {private volatile boolean running = true;public void run(SourceContext<SensorReading> ctx) throws Exception {Random random = new Random();while (running) {for (int i = 0; i < 10; i++) {ctx.collect(new SensorReading(i + "", System.currentTimeMillis(), random.nextGaussian()));}}}public void cancel() {running = false;}});dataStream.print();}public static void main(String[] args) throws Exception {readFromUserDefine();env.execute();} }

    Transform

    映射轉換算子

  • map: 將數據一一映射
  • flatMap: 將數據打散后進行映射
  • filter: 對數據進行過濾
  • 聚合轉換算子

  • keyBy: 聚合操作, 將一個流 hash 運算拆分為不相交的分區, 每個分區包含相同key
    滾動聚合: sum, min, max, minBy(), maxBy();
  • reduce: 聚合操作, 合并當前元素與上次聚合的結果, 返回流包含所有聚合的結果
  • 多流轉換算子

  • split 和 select: 根據某些特征將 DataStream 拆分為 2 個或 多個 DataStream
    split: 將 DataStream 打上標簽.
    select: 將打上標簽的 DataStream 進行一個拆分.
  • Connect 和 CoMap: 2個 DataStream 包裝為 1 個 DataStream
    connect: 包裝后內部流依舊保持各自的狀態, 流與流之間相互獨立
    coMap/coFlatMap: 對 connect 操作后的流, 進行 map/flatMa 合并操作
  • union: 將 2 個以上相同類型的 DataStream 合并為同一個流
  • 具體實操代碼如下:

    import com.regotto.entity.SensorReading; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.util.Collector;import java.util.Arrays; import java.util.Collections; import java.util.Properties; import java.util.Random;/*** @author regotto*/ public class TransformTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();public static void main(String[] args) throws Exception {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");env.setParallelism(1);// map, 映射操作, 將數據映射封裝為 SensorReadingDataStream<SensorReading> map = dataStream.map(value -> {String[] fields = value.split(",");return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));});map.print("map");// flatMap, 將原來的數據打散然后映射dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String s : value.split(",")) {out.collect(s);}}}).print("flatMap");// filter, 過濾器dataStream.filter((FilterFunction<String>) value -> value.startsWith("1")).print("filter");// map 進行滾動聚合求當前溫度最大值, keyBy 可以用指定位置, 屬性, 自定義 keySelectorKeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId);keyedStream.max("temperature").print("max temperature");// reduce 聚合, 求最大溫度下的最大時間戳記錄keyedStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading curData, SensorReading newData) throws Exception {return new SensorReading(curData.getId(), newData.getTimestamp(), Math.max(curData.getTemperature(), newData.getTemperature()));}}).print("最大溫度下的最新時間");// split&select 根據溫度把數據分為高溫, 低溫SplitStream<SensorReading> splitStream = keyedStream.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 36 ? Collections.singletonList("high") : Collections.singletonList("low");}});DataStream<SensorReading> high = splitStream.select("high");DataStream<SensorReading> low = splitStream.select("low");DataStream<SensorReading> all = splitStream.select("high", "low");high.print("高溫流");low.print("低溫流");all.print("all");// connect&coMap, 將高溫處理為二元組, 與低溫進行合并, 輸出狀態信息ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream =high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), value.getTemperature());}}).connect(low);connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.f0, value.f1, "高溫報警");}@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), "溫度正常");}}).print("connect&coMap");// 使用 union 合并 hig, lowhigh.union(low, all).print("union");env.execute();} }

    算子運算轉化圖:

    RichMapFunction

    對于 MapFunction的增強, 可以獲取 RuntimeContext, 一個運行上下文代表一個分區, 每個分區創建銷毀都執行 open, close 操作, 對資源預處理, 資源銷毀進行操作, 繼承 RichMapFunction重寫 open, close 實現資源預處理與回收操作. 使操作更為靈活, 其余 RichXXX操作同理.

    遇到的問題

    寫函數的時候, 把匿名內部類簡寫為 lambda 表達式, 導致泛型擦除的問題, 出現報錯:
    The generic type parameters of ‘Collector’ are missing. In many cases lambda methods don’t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.FlatMapFunction’ interface. Otherwise the type has to be specified explicitly using type information.

    總結

    數據在運算, 轉化過程, 一定要搞清楚, 輸入是啥, 輸出是啥.

    總結

    以上是生活随笔為你收集整理的FlinkAPI_Environment_输入源_算子转化流程的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。