侧输出流简单应用-打印的完整流程
生活随笔
收集整理的這篇文章主要介紹了
侧输出流简单应用-打印的完整流程
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1.添加運行環境和設置時間語義
如果是遲到數據處理就只能在事件時間語義下使用,如果是一般數據使用側輸出流就看業務需求是按什么條件進行分流eg:如果按照數據中的溫度進行劃分高溫流和低溫流,可以直接使用處理時間語義(默認值,不需要設置)。如果業務需要數據中的下單時間進行相關統計,就需要開啟事件時間
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置全局并行度 env.setParallelism(1); //開啟事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);2.事件時間語義下需要設置watermark
為流設置watermark,如果是亂序數據就需要使用BoundOutOfOrdernessTimestampExtractor,如果時間是嚴格遞增,則使用AscendingTimestampExtractor
BoundOutOfOrdernessTimestampExtractor需要設置延遲時間,一般取逆序時間的最大值eg:01:00 02:01 01:50 01:20…當前最大逆序為2:01和1:20即41s
3.設置Output標簽
OutputTag<Person> outputtag=new Output<>("hello"){};4.開啟側輸出流
- 遲到數據可以使用特殊的側輸出流形式
大招:所有想要使用側輸出流的情況都可以使用底層函數process
DataStream<String> mainStream=mapStream.keyBy("id).timeWindow(Time.seconds(15)).process(new MyProcessFunction());//通過process獲取到的流數據的上下文設置側輸出流 public static class MyProcessFunction extends KeyedProcessFunction<String,Person,String>{@Overridepublic void processElement(Person value, Context ctx, Collector<String> out) throws Exception {if(value.getTimestamp<10000000){//假設我們將時間戳小于10000000作為分界,分到兩個流中ctx.output(outputtag,value);}else{out.collect(value);}} }5.使用側輸出流
//簡單應用-打印: //獲取側輸出流 DataStream<Person> sideOutput = mainStream.getSideOutput(outputtag); //打印 sideOutput.print("sideoutputstream"); //主流打印 mainStream。print("mainstream"); //執行任務 env.execute(); 超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的侧输出流简单应用-打印的完整流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hbase建表,删表,修改,查询(get
- 下一篇: Flink专题-Source