日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(二十三):​​​​​​​Watermaker案例演示

發布時間:2023/11/28 生活经验 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(二十三):​​​​​​​Watermaker案例演示 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄

Watermaker案例演示

需求

API

代碼實現-1-開發版-掌握

代碼實現-2-驗證版-了解


Watermaker案例演示

需求

有訂單數據,格式為: (訂單ID,用戶ID,時間戳/事件時間,訂單金額)

要求每隔5s,計算5秒內,每個用戶的訂單總金額

并添加Watermaker來解決一定程度上的數據延遲和數據亂序問題。

API

注意:一般我們都是直接使用Flink提供好的BoundedOutOfOrdernessTimestampExtractor

代碼實現-1-開發版-掌握

Apache Flink 1.12 Documentation: Generating Watermarks

package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 模擬實時訂單數據,格式為: (訂單ID,用戶ID,訂單金額,時間戳/事件時間)* 要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額* 并添加Watermaker來解決一定程度上的數據延遲和數據亂序問題。*/
public class WatermakerDemo01_Develop {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模擬實時訂單數據(數據有延遲和亂序)DataStream<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模擬數據延遲和亂序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;ctx.collect(new Order(orderId, userId, money, eventTime));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.Transformation//-告訴Flink要基于事件時間來計算!//env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//新版本默認就是EventTime//-告訴Flnk數據中的哪一列是事件時間,因為Watermaker = 當前最大的事件時間?- 最大允許的延遲時間或亂序時間/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(3)) {//最大允許的延遲時間或亂序時間@Overridepublic long extractTimestamp(Order element) {return element.eventTime;//指定事件時間是哪一列,Flink底層會自動計算://Watermaker = 當前最大的事件時間?- 最大允許的延遲時間或亂序時間}});*/DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代碼走到這里,就已經被添加上Watermaker了!接下來就可以進行窗口計算了//要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額DataStream<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}

???????代碼實現-2-驗證版-了解


package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;/*** Author lanson* Desc* 模擬實時訂單數據,格式為: (訂單ID,用戶ID,訂單金額,時間戳/事件時間)* 要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額* 并添加Watermaker來解決一定程度上的數據延遲和數據亂序問題。*/
public class WatermakerDemo02_Check {public static void main(String[] args) throws Exception {FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模擬實時訂單數據(數據有延遲和亂序)DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模擬數據延遲和亂序!long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;System.out.println("發送的數據為: "+userId + " : " + df.format(eventTime));ctx.collect(new Order(orderId, userId, money, eventTime));TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.Transformation/*DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));*///開發中直接使用上面的即可//學習測試時可以自己實現DataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(new WatermarkStrategy<Order>() {@Overridepublic WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new WatermarkGenerator<Order>() {private int userId = 0;private long eventTime = 0L;private final long outOfOrdernessMillis = 3000;private long maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;@Overridepublic void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {userId = event.userId;eventTime = event.eventTime;maxTimestamp = Math.max(maxTimestamp, eventTimestamp);}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {//Watermaker = 當前最大事件時間?- 最大允許的延遲時間或亂序時間Watermark watermark = new Watermark(maxTimestamp - outOfOrdernessMillis - 1);System.out.println("key:" + userId + ",系統時間:" + df.format(System.currentTimeMillis()) + ",事件時間:" + df.format(eventTime) + ",水印時間:" + df.format(watermark.getTimestamp()));output.emitWatermark(watermark);}};}}.withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代碼走到這里,就已經被添加上Watermaker了!接下來就可以進行窗口計算了//要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額/* DataStream<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum("money");*///開發中使用上面的代碼進行業務計算即可//學習測試時可以使用下面的代碼對數據進行更詳細的輸出,如輸出窗口觸發時各個窗口中的數據的事件時間,Watermaker時間DataStream<String> result = watermakerDS.keyBy(Order::getUserId).window(TumblingEventTimeWindows.of(Time.seconds(5)))//把apply中的函數應用在窗口中的數據上//WindowFunction<IN, OUT, KEY, W extends Window>.apply(new WindowFunction<Order, String, Integer, TimeWindow>() {@Overridepublic void apply(Integer key, TimeWindow window, Iterable<Order> input, Collector<String> out) throws Exception {//準備一個集合用來存放屬于該窗口的數據的事件時間List<String> eventTimeList = new ArrayList<>();for (Order order : input) {Long eventTime = order.eventTime;eventTimeList.add(df.format(eventTime));}String outStr = String.format("key:%s,窗口開始結束:[%s~%s),屬于該窗口的事件時間:%s",key.toString(), df.format(window.getStart()), df.format(window.getEnd()), eventTimeList);out.collect(outStr);}});//4.Sinkresult.print();//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}

總結

以上是生活随笔為你收集整理的2021年大数据Flink(二十三):​​​​​​​Watermaker案例演示的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。