日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示

發布時間:2023/11/28 生活经验 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Allowed Lateness案例演示

需求

有訂單數據,格式為: (訂單ID,用戶ID,時間戳/事件時間,訂單金額)

要求每隔5s,計算5秒內,每個用戶的訂單總金額

并添加Watermaker來解決一定程度上的數據延遲和數據亂序問題。

并使用OutputTag+allowedLateness解決數據丟失問題

API

package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;import java.time.Duration;
import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc* 模擬實時訂單數據,格式為: (訂單ID,用戶ID,訂單金額,時間戳/事件時間)* 要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額* 并添加Watermaker來解決一定程度上的數據延遲和數據亂序問題。*/
public class WatermakerDemo03_AllowedLateness {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模擬實時訂單數據(數據有延遲和亂序)DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模擬數據延遲和亂序!long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;ctx.collect(new Order(orderId, userId, money, eventTime));//TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.TransformationDataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代碼走到這里,就已經被添加上Watermaker了!接下來就可以進行窗口計算了//要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));SingleOutputStreamOperator<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(outputTag).sum("money");DataStream<Order> result2 = result.getSideOutput(outputTag);//4.Sinkresult.print("正常的數據和遲到不嚴重的數據");result2.print("遲到嚴重的數據");//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}

總結

以上是生活随笔為你收集整理的2021年大数据Flink(二十四):​​​​​​​Allowed Lateness案例演示的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。