2021年大数据Flink(二十四):Allowed Lateness案例演示
生活随笔
收集整理的這篇文章主要介紹了
2021年大数据Flink(二十四):Allowed Lateness案例演示
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Allowed Lateness案例演示
需求
有訂單數據,格式為: (訂單ID,用戶ID,時間戳/事件時間,訂單金額)
要求每隔5s,計算5秒內,每個用戶的訂單總金額
并添加Watermaker來解決一定程度上的數據延遲和數據亂序問題。
并使用OutputTag+allowedLateness解決數據丟失問題
API
package cn.it.watermaker;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;import java.time.Duration;
import java.util.Random;
import java.util.UUID;/*** Author lanson* Desc* 模擬實時訂單數據,格式為: (訂單ID,用戶ID,訂單金額,時間戳/事件時間)* 要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額* 并添加Watermaker來解決一定程度上的數據延遲和數據亂序問題。*/
public class WatermakerDemo03_AllowedLateness {public static void main(String[] args) throws Exception {//1.envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.Source//模擬實時訂單數據(數據有延遲和亂序)DataStreamSource<Order> orderDS = env.addSource(new SourceFunction<Order>() {private boolean flag = true;@Overridepublic void run(SourceContext<Order> ctx) throws Exception {Random random = new Random();while (flag) {String orderId = UUID.randomUUID().toString();int userId = random.nextInt(3);int money = random.nextInt(100);//模擬數據延遲和亂序!long eventTime = System.currentTimeMillis() - random.nextInt(10) * 1000;ctx.collect(new Order(orderId, userId, money, eventTime));//TimeUnit.SECONDS.sleep(1);}}@Overridepublic void cancel() {flag = false;}});//3.TransformationDataStream<Order> watermakerDS = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((event, timestamp) -> event.getEventTime()));//代碼走到這里,就已經被添加上Watermaker了!接下來就可以進行窗口計算了//要求每隔5s,計算5秒內(基于時間的滾動窗口),每個用戶的訂單總金額OutputTag<Order> outputTag = new OutputTag<>("Seriouslylate", TypeInformation.of(Order.class));SingleOutputStreamOperator<Order> result = watermakerDS.keyBy(Order::getUserId)//.timeWindow(Time.seconds(5), Time.seconds(5)).window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(outputTag).sum("money");DataStream<Order> result2 = result.getSideOutput(outputTag);//4.Sinkresult.print("正常的數據和遲到不嚴重的數據");result2.print("遲到嚴重的數據");//5.executeenv.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublic static class Order {private String orderId;private Integer userId;private Integer money;private Long eventTime;}
}
總結
以上是生活随笔為你收集整理的2021年大数据Flink(二十四):Allowed Lateness案例演示的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Flink(二十三):
- 下一篇: 2021年大数据Flink(二十五):F