FlinkAPI_Environment_输入源_算子转化流程
Flink Environment
getExecutionEnvironment()
根據當前平臺, 獲取對應的執行環境, 若未設置并行度, 使用 flink-conf.yaml 中的并行度配置, 默認 1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();createLocalEnviroment()
創建本地環境, 并行度默認為 CPU 核數, 也可在構造函數中傳參設置 LocalStreamEnvironment localEnvironment = StreamExecutionEnvironment.createLocalEnvironment();createRemoteEnviroment()
創建遠程環境, 將 jar 提交到遠程環境執行 StreamExecutionEnvironment remoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 7777, "/home/WordCount.jar");Flink 輸入源
如下, 使用 Kafka 作為輸入源引入連接器依賴: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version> </dependency>env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));
主要用于測試, 定義假數據.
具體實操代碼如下:
import com.regotto.entity.SensorReading; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;import java.util.Arrays; import java.util.Properties; import java.util.Random;/*** @author regotto*/ public class SourceTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();private static void readFromCollectionAndElement() {/*從集合中讀取, SensorReading 自定義實體(String id, Long timestamp, Double temperature)*/DataStream<SensorReading> dataStream = env.fromCollection(Arrays.asList(new SensorReading("1", 1111L, 35.1),new SensorReading("2", 2222L, 32.1),new SensorReading("3", 3333L, 33.1),new SensorReading("4", 12345L, 36.1)));DataStreamSource<Integer> elements = env.fromElements(1, 2, 3, 4, 5);dataStream.print("data");elements.print("int");}private static void readFromText() {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");dataStream.print();}private static void readFromKafka() {Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9999");properties.setProperty("group.id", "consumer-group");properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));dataStream.print();}/*** 用戶自定義輸入源*/private static void readFromUserDefine() {// 實現 SourceFunction 接口, run 方法中定義數據并使用 collect 資源輸出DataStream<SensorReading> dataStream = env.addSource(new SourceFunction<SensorReading>() {private volatile boolean running = true;public void run(SourceContext<SensorReading> ctx) throws Exception {Random random = new Random();while (running) {for (int i = 0; i < 10; i++) {ctx.collect(new SensorReading(i + "", System.currentTimeMillis(), random.nextGaussian()));}}}public void cancel() {running = false;}});dataStream.print();}public static void main(String[] args) throws Exception {readFromUserDefine();env.execute();} }Transform
映射轉換算子
聚合轉換算子
滾動聚合: sum, min, max, minBy(), maxBy();
多流轉換算子
split: 將 DataStream 打上標簽.
select: 將打上標簽的 DataStream 進行一個拆分.
connect: 包裝后內部流依舊保持各自的狀態, 流與流之間相互獨立
coMap/coFlatMap: 對 connect 操作后的流, 進行 map/flatMa 合并操作
具體實操代碼如下:
import com.regotto.entity.SensorReading; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.util.Collector;import java.util.Arrays; import java.util.Collections; import java.util.Properties; import java.util.Random;/*** @author regotto*/ public class TransformTest {private static StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();public static void main(String[] args) throws Exception {DataStream<String> dataStream = env.readTextFile("D:\\sensor.txt");env.setParallelism(1);// map, 映射操作, 將數據映射封裝為 SensorReadingDataStream<SensorReading> map = dataStream.map(value -> {String[] fields = value.split(",");return new SensorReading(fields[0], Long.valueOf(fields[1]), Double.valueOf(fields[2]));});map.print("map");// flatMap, 將原來的數據打散然后映射dataStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {for (String s : value.split(",")) {out.collect(s);}}}).print("flatMap");// filter, 過濾器dataStream.filter((FilterFunction<String>) value -> value.startsWith("1")).print("filter");// map 進行滾動聚合求當前溫度最大值, keyBy 可以用指定位置, 屬性, 自定義 keySelectorKeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId);keyedStream.max("temperature").print("max temperature");// reduce 聚合, 求最大溫度下的最大時間戳記錄keyedStream.reduce(new ReduceFunction<SensorReading>() {@Overridepublic SensorReading reduce(SensorReading curData, SensorReading newData) throws Exception {return new SensorReading(curData.getId(), newData.getTimestamp(), Math.max(curData.getTemperature(), newData.getTemperature()));}}).print("最大溫度下的最新時間");// split&select 根據溫度把數據分為高溫, 低溫SplitStream<SensorReading> splitStream = keyedStream.split(new OutputSelector<SensorReading>() {@Overridepublic Iterable<String> select(SensorReading value) {return value.getTemperature() > 36 ? Collections.singletonList("high") : Collections.singletonList("low");}});DataStream<SensorReading> high = splitStream.select("high");DataStream<SensorReading> low = splitStream.select("low");DataStream<SensorReading> all = splitStream.select("high", "low");high.print("高溫流");low.print("低溫流");all.print("all");// connect&coMap, 將高溫處理為二元組, 與低溫進行合并, 輸出狀態信息ConnectedStreams<Tuple2<String, Double>, SensorReading> connectedStream =high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {@Overridepublic Tuple2<String, Double> map(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), value.getTemperature());}}).connect(low);connectedStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {@Overridepublic Object map1(Tuple2<String, Double> value) throws Exception {return new Tuple3<>(value.f0, value.f1, "高溫報警");}@Overridepublic Object map2(SensorReading value) throws Exception {return new Tuple2<>(value.getId(), "溫度正常");}}).print("connect&coMap");// 使用 union 合并 hig, lowhigh.union(low, all).print("union");env.execute();} }算子運算轉化圖:
RichMapFunction
對于 MapFunction的增強, 可以獲取 RuntimeContext, 一個運行上下文代表一個分區, 每個分區創建銷毀都執行 open, close 操作, 對資源預處理, 資源銷毀進行操作, 繼承 RichMapFunction重寫 open, close 實現資源預處理與回收操作. 使操作更為靈活, 其余 RichXXX操作同理.
遇到的問題
寫函數的時候, 把匿名內部類簡寫為 lambda 表達式, 導致泛型擦除的問題, 出現報錯:
The generic type parameters of ‘Collector’ are missing. In many cases lambda methods don’t provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the ‘org.apache.flink.api.common.functions.FlatMapFunction’ interface. Otherwise the type has to be specified explicitly using type information.
總結
數據在運算, 轉化過程, 一定要搞清楚, 輸入是啥, 輸出是啥.
總結
以上是生活随笔為你收集整理的FlinkAPI_Environment_输入源_算子转化流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 调用python_Java平台
- 下一篇: 北京林业大学计算机复试难度,北京林业大学