日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

侧输出流简单应用-打印的完整流程

發布時間:2024/2/28 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 侧输出流简单应用-打印的完整流程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

1.添加運行環境和設置時間語義

如果是遲到數據處理就只能在事件時間語義下使用,如果是一般數據使用側輸出流就看業務需求是按什么條件進行分流eg:如果按照數據中的溫度進行劃分高溫流和低溫流,可以直接使用處理時間語義(默認值,不需要設置)。如果業務需要數據中的下單時間進行相關統計,就需要開啟事件時間

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置全局并行度 env.setParallelism(1); //開啟事件時間 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

2.事件時間語義下需要設置watermark

為流設置watermark,如果是亂序數據就需要使用BoundOutOfOrdernessTimestampExtractor,如果時間是嚴格遞增,則使用AscendingTimestampExtractor
BoundOutOfOrdernessTimestampExtractor需要設置延遲時間,一般取逆序時間的最大值eg:01:00 02:01 01:50 01:20…當前最大逆序為2:01和1:20即41s

//為流設置watermark,如果是亂序數據就需要使用BoundOutOfOrdernessTimestampExtractor,如果時間是嚴格遞增的就可以使用 //AscendingTimestampExtractor DataStreamSource<String> inputStream = env.readTextFile("hello.csv"); DataStream<UserBehavior> mapStream = inputStream.map(line -> {String[] split = line.split(",");return new Person(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]),split[3], Long.valueOf(split[4]));}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Person>(Time.seconds(2L)) {//設置延遲時間為2s@Overridepublic long extractTimestamp(Person element) {return element.getTimestamp()*1000;//每個數據的實際事件時間,這里乘1000是因為原始數據使//用的時間戳是s,需要轉換為毫秒}})

3.設置Output標簽

OutputTag<Person> outputtag=new Output<>("hello"){};

4.開啟側輸出流

  • 遲到數據可以使用特殊的側輸出流形式
//1).開啟窗口 DataStream<Person> mainStream=mapStream.keyBy("id).timeWindow(Time.seconds(15)) //2).設置允許的延遲數據時間 .allowedLateness(Time.minutes(1)) //添加側輸出流標簽 .sideOutputLateData(outputtag);

大招:所有想要使用側輸出流的情況都可以使用底層函數process

DataStream<String> mainStream=mapStream.keyBy("id).timeWindow(Time.seconds(15)).process(new MyProcessFunction());//通過process獲取到的流數據的上下文設置側輸出流 public static class MyProcessFunction extends KeyedProcessFunction<String,Person,String>{@Overridepublic void processElement(Person value, Context ctx, Collector<String> out) throws Exception {if(value.getTimestamp<10000000){//假設我們將時間戳小于10000000作為分界,分到兩個流中ctx.output(outputtag,value);}else{out.collect(value);}} }

5.使用側輸出流

//簡單應用-打印: //獲取側輸出流 DataStream<Person> sideOutput = mainStream.getSideOutput(outputtag); //打印 sideOutput.print("sideoutputstream"); //主流打印 mainStream。print("mainstream"); //執行任務 env.execute(); 超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生

總結

以上是生活随笔為你收集整理的侧输出流简单应用-打印的完整流程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。