日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等

發(fā)布時間:2024/9/27 编程问答 33 豆豆

1.16.Flink Window和Time詳解
1.16.1.Window(窗口)
1.16.2.Window的類型
1.16.3.Window類型匯總
1.16.4.TimeWindow的應用
1.16.5.CountWindow的應用
1.16.6.Window聚合分類
1.16.7.Window聚合分類之增量聚合
1.16.7.1.增量聚合狀態(tài)變化過程-累加求和
1.16.7.2.reduce(reduceFunction)
1.16.7.3.aggregate(aggregateFunction)
1.16.8.Window聚合分類之全量聚合
1.16.8.1.全量聚合狀態(tài)變化過程-求最大值
1.16.8.2.apply(windowFunction)
1.16.8.3.process(processWindowFunction)
1.16.9.Time介紹
1.16.9.1.設置Time類型
1.16.9.2.EventTime和Watermarks
1.16.9.3.有序的流的watermarks
1.16.9.4.無序的流的watermarks
1.16.9.5.多并行度流的watermarks
1.16.9.6.watermarks的生成方式
1.16.9.7.Flink應該如何設置最大亂序時間?
1.16.9.8.Flink應該如何設置最大亂序時間?

1.16.Flink Window和Time詳解

1.16.1.Window(窗口)

?聚合事件(比如計數、求和)在流上的工作方式與批處理不同。

  • ?比如,對流中的所有元素進行計數是不可能的,因為通常流是無限的(無界的)。所以,流上的聚合需要由 window 來劃定范圍,比如 “計算過去的5分鐘” ,或者 “最后100個元素的和”。
  • ?window是一種可以把無限數據切割為有限數據塊的手段。
    ?窗口可以是 時間驅動的 【Time Window】(比如:每30秒)或者 數據驅動的【Count Window】 (比如:每100個元素)。

1.16.2.Window的類型

?窗口通常被區(qū)分為不同的類型:
一:tumbling windows:滾動窗口 【沒有重疊】

二:sliding windows:滑動窗口 【有重疊】

三:session windows:會話窗口

1.16.3.Window類型匯總

TimeWindow和CountWindow都可以有tumbling windows和sliding wndows

1.16.4.TimeWindow的應用

1.16.5.CountWindow的應用

1.16.6.Window聚合分類

?增量聚合
?全量聚合

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;/*** window** Created by xxxx on 2020/10/09 .*/ public class SocketDemoFullCount {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).process(new ProcessWindowFunction<Tuple2<Integer,Integer>, String, Tuple, TimeWindow>() {@Overridepublic void process(Tuple key, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<String> out)throws Exception {System.out.println("執(zhí)行process。。。");long count = 0;for(Tuple2<Integer,Integer> element: elements){count++;}out.collect("window:"+context.window()+",count:"+count);}}).print();//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}} import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time;/*** window** Created by xxxx on 2020/10/09 .*/ public class SocketDemoIncrAgg {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);DataStream<Tuple2<Integer,Integer>> intData = text.map(new MapFunction<String, Tuple2<Integer,Integer>>() {@Overridepublic Tuple2<Integer,Integer> map(String value) throws Exception {return new Tuple2<>(1,Integer.parseInt(value));}});intData.keyBy(0).timeWindow(Time.seconds(5)).reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {@Overridepublic Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {System.out.println("執(zhí)行reduce操作:"+value1+","+value2);return new Tuple2<>(value1.f0,value1.f1+value2.f1);}}).print();//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;/*** 滑動窗口計算** 通過socket模擬產生單詞數據* flink對數據進行統(tǒng)計計算** 需要實現每隔1秒對最近2秒內的數據進行匯總計算*** Created by xxxx on 2020/10/09 .*/ public class SocketWindowWordCountJava {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒.sum("count");//在這里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把數據打印到控制臺并且設置并行度windowCounts.print().setParallelism(1);//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}} import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector;/*** checkpoint** Created by xxxx on 2020/10/09 .*/ public class SocketWindowWordCountJavaCheckPoint {public static void main(String[] args) throws Exception{//獲取需要的端口號int port;try {ParameterTool parameterTool = ParameterTool.fromArgs(args);port = parameterTool.getInt("port");}catch (Exception e){System.err.println("No port set. use default port 9000--java");port = 9000;}//獲取flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】env.enableCheckpointing(1000);// 高級選項:// 設置模式為exactly-once (這是默認值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一時間只允許進行一個檢查點env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel后,會刪除Checkpoint數據,只有job執(zhí)行失敗的時候才會保存checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//設置statebackend//env.setStateBackend(new MemoryStateBackend());//env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints"));//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));String hostname = "hadoop100";String delimiter = "\n";//連接socket獲取輸入的數據DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);// a a c// a 1// a 1// c 1DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {public void flatMap(String value, Collector<WordWithCount> out) throws Exception {String[] splits = value.split("\\s");for (String word : splits) {out.collect(new WordWithCount(word, 1L));}}}).keyBy("word").timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小為2秒,指定時間間隔為1秒.sum("count");//在這里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把數據打印到控制臺并且設置并行度windowCounts.print().setParallelism(1);//這一行代碼一定要實現,否則程序不執(zhí)行env.execute("Socket window count");}public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word,long count){this.word = word;this.count = count;}@Overridepublic String toString() {return "WordWithCount{" +"word='" + word + '\'' +", count=" + count +'}';}}} import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;/*** 把collection集合作為數據源** Created by xxxx on 2020/10/09 on 2018/10/23.*/ public class StreamingFromCollection {public static void main(String[] args) throws Exception {//獲取Flink的運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();ArrayList<Integer> data = new ArrayList<>();data.add(10);data.add(15);data.add(20);//指定數據源DataStreamSource<Integer> collectionData = env.fromCollection(data);//通map對數據進行處理DataStream<Integer> num = collectionData.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer value) throws Exception {return value + 1;}});//直接打印num.print().setParallelism(1);env.execute("StreamingFromCollection");} }

另外的Scala案例:

import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.windowing.time.Time/*** 滑動窗口計算** 每隔1秒統(tǒng)計最近2秒內的數據,打印到控制臺** Created by xxxx on 2020/10/09 .*/ object SocketWindowWordCountScala {def main(args: Array[String]): Unit = {//獲取socket端口號val port: Int = try {ParameterTool.fromArgs(args).getInt("port")}catch {case e: Exception => {System.err.println("No port set. use default port 9000--scala")}9000}//獲取運行環(huán)境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//鏈接socket獲取輸入數據val text = env.socketTextStream("hadoop100",port,'\n')//解析數據(把數據打平),分組,窗口計算,并且聚合求sum//注意:必須要添加這一行隱式轉行,否則下面的flatmap方法執(zhí)行會報錯import org.apache.flink.api.scala._val windowCounts = text.flatMap(line => line.split("\\s"))//打平,把每一行單詞都切開.map(w => WordWithCount(w,1))//把單詞轉成word , 1這種形式.keyBy("word")//分組.timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定間隔時間.sum("count");// sum或者reduce都可以//.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))//打印到控制臺windowCounts.print().setParallelism(1);//執(zhí)行任務env.execute("Socket window count");}case class WordWithCount(word: String,count: Long)} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment/*** Created by xxxx on 2020/10/09 on 2018/10/23.*/ object StreamingFromCollectionScala {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//隱式轉換import org.apache.flink.api.scala._val data = List(10,15,20)val text = env.fromCollection(data)//針對map接收到的數據執(zhí)行加1的操作val num = text.map(_+1)num.print().setParallelism(1)env.execute("StreamingFromCollectionScala")}}

1.16.7.Window聚合分類之增量聚合

窗口中每進入一條數據,就進行一次計算

reduce(reduceFunction) aggregate(aggregateFunction) sum(),min(),max()

1.16.7.1.增量聚合狀態(tài)變化過程-累加求和

1.16.7.2.reduce(reduceFunction)

1.16.7.3.aggregate(aggregateFunction)

1.16.8.Window聚合分類之全量聚合

?全量聚合

  • ?等屬于窗口的數據到齊,才開始進行聚合計算【可以實現對窗口內的數據進行排序等需求】
  • ?apply(windowFunction)
  • ?process(processWindowFunction)
    ?processWindowFunction比windowFunction提供了更多的上下文信息。

1.16.8.1.全量聚合狀態(tài)變化過程-求最大值

1.16.8.2.apply(windowFunction)

1.16.8.3.process(processWindowFunction)


1.16.9.Time介紹

?針對stream數據中的時間,可以分為以下三種

  • ?Event Time:事件產生的時間,它通常由事件中的時間戳描述。
  • ?Ingestion time:事件進入Flink的時間
  • ?Processing Time:事件被處理時當前系統(tǒng)的時間。

    ?處理時間(processing time):處理時間是指執(zhí)行相應操作的機器的系統(tǒng)時間。
    當流處理程序基于處理時間運行時,所有基于時間的操作(如時間窗口)將使用運行相應運算符的機器的系統(tǒng)時鐘。每小時處理時間窗口將包括在系統(tǒng)時鐘指示整個小時之間到達特定運算符的所有記錄。 例如,如果應用程序在上午9:15開始運行,則第一個每小時處理時間窗口將包括在上午9:15到10:00之間處理的事件,下一個窗口將包括在上午10:00到11:00之間處理的事件,以此類推。

處理時間是最簡單的時間概念,不需要流和機器之間的協(xié)調。 它提供最佳性能和最低延遲。 但是,在分布式和異步環(huán)境中,處理時間不提供確定性,因為它容易受到記錄到達系統(tǒng)的速度(例如從消息隊列),記錄在系統(tǒng)內的運算符之間流動的速度的影響,以及停電(計劃或其他)。
?事件時間(event time):事件時間是每個事件在其生產設備上發(fā)生的時間。此時間通常在進入Flink之前嵌入記錄中,并且可以從每個記錄中提取該事件時間戳。 在事件時間,時間的進展取決于數據,而不是任何時鐘。 事件時間程序必須指定如何生成事件時間水印,這是表示事件時間進度的機制。 該水印機制在下面的后面部分中描述。

在一個完美的世界中,事件時間處理將產生完全一致和確定的結果,無論事件何時到達或其它們的順序。 但是,除非事件已知按順序到達(按時間戳),否則事件時間處理會在等待無序事件時產生一些延遲。 由于只能等待一段有限的時間,因此限制了確定性事件時間應用程序的運行方式。

假設所有數據都已到達,事件時間操作將按預期運行,即使在處理無序或延遲事件或重新處理歷史數據時也會產生正確且一致的結果。 例如,每小時事件時間窗口將包含帶有落入該小時的事件時間戳的所有記錄,無論它們到達的順序如何,或者何時處理它們。 (有關更多信息,請參閱有關遲到事件的部分。)

請注意,有時基于事件時間的程序處理實時數據時,它們將使用一些處理時間(processing time)操作,以保證它們及時進行。
?進入時間(Ingestion time): 進入時間是事件進入Flink的時間。 在源運算符處,每個記錄將源的當前時間作為時間戳,并且基于時間的操作(如時間窗口)引用該時間戳。
進入時間在概念上位于事件時間和處理時間之間。與處理時間相比,它代價稍高,但可以提供更可預測的結果。 因為進入時間使用穩(wěn)定的時間戳(在源處分配一次),所以對記錄的不同窗口操作將引用相同的時間戳,而在處理時間中,每個窗口操作符可以將記錄分配給不同的窗口(基于本地系統(tǒng)時鐘和 任何傳輸延誤)。

與事件時間相比,進入時間程序無法處理任何無序事件或延遲數據,但程序不必指定如何生成水印。

在內部,攝取時間與事件時間非常相似,但具有自動分配時間戳和自動生成水印功能。

1.16.9.1.設置Time類型

?Flink中,默認Time類似是ProcessingTime
?可以在代碼中設置

1.16.9.2.EventTime和Watermarks

?在使用eventTime的時候如何處理亂序數據?
?我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由于網絡延遲等原因,導致亂序的產生,特別是使用kafka的話,多個分區(qū)的數據無法保證有序。所以在進行window計算的時候,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發(fā)window去進行計算了。這個特別的機制,就是watermark,watermark是用于處理亂序事件的。
?watermark可以翻譯為水位線

1.16.9.3.有序的流的watermarks

1.16.9.4.無序的流的watermarks

1.16.9.5.多并行度流的watermarks

注意:多并行度的情況下,watermark對齊會取所有channel最小的watermark

1.16.9.6.watermarks的生成方式

?通常,在接收到source的數據后,應該立刻生成watermark;但是,也可以在source后,應用簡單的map或者filter操作后,再生成watermark。
?注意:如果指定多次watermark,后面指定的會覆蓋前面的值。
?生成方式

  • ?With Periodic Watermarks
    1、周期性的觸發(fā)watermark的生成和發(fā)送,默認是100ms
    2、每隔N秒自動向流里注入一個WATERMARK 時間間隔由ExecutionConfig.setAutoWatermarkInterval 決定. 每次調用getCurrentWatermark 方法, 如果得到的WATERMARK 不為空并且比之前的大就注入流中。
    3、可以定義一個最大允許亂序的時間,這種比較常用
    4、實現AssignerWithPeriodicWatermarks接口

  • ?With Punctuated Watermarks
    1、基于某些事件觸發(fā)watermark的生成和發(fā)送
    2、基于事件向流里注入一個WATERMARK,每一個元素都有機會判斷是否生成一個WATERMARK. 如果得到的WATERMARK 不為空并且比之前的大就注入流中。
    3、實現AssignerWithPunctuatedWatermarks接口

import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List;/**** Watermark 案例** Created by xxxx on 2020/10/09.*/ public class StreamingWindowWatermark {public static void main(String[] args) throws Exception {//定義socket的端口號int port = 9000;//獲取運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置使用eventtime,默認是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//設置并行度為1,默認并行度是當前機器的cpu數量env.setParallelism(1);//連接socket獲取輸入的數據DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");//解析輸入的數據DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定義生成watermark的邏輯* 默認100ms被調用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定義如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);long id = Thread.currentThread().getId();System.out.println("currentThreadId:"+id+",key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");return timestamp;}});DataStream<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果一樣.apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 對window內的數據進行排序,保證數據的順序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());out.collect(result);}});//測試-把結果打印到控制臺即可window.print();//注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執(zhí)行env.execute("eventtime-watermark");}} import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag;import javax.annotation.Nullable; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List;/**** Watermark 案例** sideOutputLateData 收集遲到的數據** Created by xxxx on 2020/10/09.*/ public class StreamingWindowWatermark2 {public static void main(String[] args) throws Exception {//定義socket的端口號int port = 9000;//獲取運行環(huán)境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置使用eventtime,默認是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//設置并行度為1,默認并行度是當前機器的cpu數量env.setParallelism(1);//連接socket獲取輸入的數據DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");//解析輸入的數據DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//抽取timestamp和生成watermarkDataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {Long currentMaxTimestamp = 0L;final Long maxOutOfOrderness = 10000L;// 最大允許的亂序時間是10sSimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");/*** 定義生成watermark的邏輯* 默認100ms被調用一次*/@Nullable@Overridepublic Watermark getCurrentWatermark() {return new Watermark(currentMaxTimestamp - maxOutOfOrderness);}//定義如何提取timestamp@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {long timestamp = element.f1;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");return timestamp;}});//保存被丟棄的數據OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};//注意,由于getSideOutput方法是SingleOutputStreamOperator子類中的特有方法,所以這里的類型,不能使用它的父類dataStream。SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和調用TimeWindow效果一樣//.allowedLateness(Time.seconds(2))//允許數據遲到2秒.sideOutputLateData(outputTag).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 對window內的數據進行排序,保證數據的順序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());out.collect(result);}});//把遲到的數據暫時打印到控制臺,實際中可以保存到其他存儲介質中DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);sideOutput.print();//測試-把結果打印到控制臺即可window.print();//注意:因為flink是懶加載的,所以必須調用execute方法,上面的代碼才會執(zhí)行env.execute("eventtime-watermark");}}

scala案例:

import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer import scala.util.Sorting/*** Watermark 案例* Created by xxxx on 2020/10/09*/ object StreamingWindowWatermarkScala {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允許的亂序時間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調用TimeWindow效果一樣.apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})window.print()env.execute("StreamingWindowWatermarkScala")} } import java.text.SimpleDateFormatimport org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorimport scala.collection.mutable.ArrayBuffer import scala.util.Sorting/*** Watermark 案例** sideOutputLateData 收集遲到的數據** Created by xxxx on 2020/10/09*/ object StreamingWindowWatermarkScala2 {def main(args: Array[String]): Unit = {val port = 9000val env = StreamExecutionEnvironment.getExecutionEnvironmentimport org.apache.flink.api.scala._env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.setParallelism(1)val text = env.socketTextStream("hadoop100",port,'\n')val inputMap = text.map(line=>{val arr = line.split(",")(arr(0),arr(1).toLong)})val waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {var currentMaxTimestamp = 0Lvar maxOutOfOrderness = 10000L// 最大允許的亂序時間是10sval sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");override def getCurrentWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness)override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long) = {val timestamp = element._2currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)val id = Thread.currentThread().getIdprintln("currentThreadId:"+id+",key:"+element._1+",eventtime:["+element._2+"|"+sdf.format(element._2)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+ sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp+"|"+sdf.format(getCurrentWatermark().getTimestamp)+"]")timestamp}})val outputTag = new OutputTag[Tuple2[String,Long]]("late-data"){}val window = waterMarkStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和調用TimeWindow效果一樣//.allowedLateness(Time.seconds(2))//允許數據遲到2秒.sideOutputLateData(outputTag).apply(new WindowFunction[Tuple2[String, Long], String, Tuple, TimeWindow] {override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]) = {val keyStr = key.toStringval arrBuf = ArrayBuffer[Long]()val ite = input.iteratorwhile (ite.hasNext){val tup2 = ite.next()arrBuf.append(tup2._2)}val arr = arrBuf.toArraySorting.quickSort(arr)val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last)+ "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)out.collect(result)}})val sideOutput: DataStream[Tuple2[String, Long]] = window.getSideOutput(outputTag)sideOutput.print()window.print()env.execute("StreamingWindowWatermarkScala")}}

1.16.9.7.Flink應該如何設置最大亂序時間?

這個要結合自己的業(yè)務以及數據情況去設置。如果maxOutOfOrderness設置的太小,而自身數據發(fā)送時由于網絡等原因導致亂序或者late太多,那么最終的結果就是會有很多單條的數據在window中被觸發(fā),數據的正確性影響太大。

對于嚴重亂序的數據,需要嚴格統(tǒng)計數據最大延遲時間,才能保證計算的數據準確,延時設置太小會影響數據準確性,延時設置太大不僅影響數據的實時性,更加會加重Flink作業(yè)的負擔,不是對eventTime要求特別嚴格的數據,盡量不要采用eventTime方式來處理,會有丟數據的風險。

1.16.9.8.Flink應該如何設置最大亂序時間?

這個要結合自己的業(yè)務以及數據情況去設置。如果maxOutOfOrderness設置的太小,而自身數據發(fā)送時由于網絡等原因導致亂序或者late太多,那么最終的結果就是會有很多單條的數據在window中被觸發(fā),數據的正確性影響太大。

對于嚴重亂序的數據,需要嚴格統(tǒng)計數據最大延遲時間,才能保證計算的數據準確,延時設置太小會影響數據準確性,延時設置太大不僅影響數據的實時性,更加會加重Flink作業(yè)的負擔,不是對eventTime要求特別嚴格的數據,盡量不要采用eventTime方式來處理,會有丟數據的風險。

與50位技術專家面對面20年技術見證,附贈技術全景圖

總結

以上是生活随笔為你收集整理的1.16.Flink Window和Time详解、TimeWindow的应用、Window聚合分类之全量聚合、全量聚合状态变化过程-求最大值、Time介绍、EventTime和Watermarks等的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。