日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

初学Flink,对Watermarks的一些理解和感悟(透彻2)

發布時間:2025/7/14 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 初学Flink,对Watermarks的一些理解和感悟(透彻2) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

2019獨角獸企業重金招聘Python工程師標準>>>

官方文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
翻譯:https://www.jianshu.com/p/68ab40c7f347

1. 幾個重要的概念簡述:

  • Window:Window是處理無界流的關鍵,Windows將流拆分為一個個有限大小的buckets,可以可以在每一個buckets中進行計算

  • start_time,end_time:當Window時時間窗口的時候,每個window都會有一個開始時間和結束時間(前開后閉),這個時間是系統時間

  • event-time: 事件發生時間,是事件發生所在設備的當地時間,比如一個點擊事件的時間發生時間,是用戶點擊操作所在的手機或電腦的時間

  • Watermarks:可以把他理解為一個水位線,這個Watermarks在不斷的變化,一旦Watermarks大于了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的。

2.如何使用Watermarks處理亂序的數據流

什么是亂序呢?可以理解為數據到達的順序和他的event-time排序不一致。導致這的原因有很多,比如延遲,消息積壓,重試等等

因為Watermarks是用來觸發window窗口計算的,我們可以根據事件的event-time,計算出Watermarks,并且設置一些延遲,給遲到的數據一些機會。

假如我們設置10s的時間窗口(window),那么0~10s,10~20s都是一個窗口,以0~10s為例,0位start-time,10為end-time。假如有4個數據的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設置Watermarks為當前所有到達數據event-time的最大值減去延遲值3.5秒

當A到達的時候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發計算
當B到達的時候,Watermarks為max(12.8,5)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當C到達的時候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當D到達的時候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發計算
觸發計算的時候,會將AC(因為他們都小于10)都計算進去

通過上面這種方式,我們就將遲到的C計算進去了

這里的延遲3.5s是我們假設一個數據到達的時候,比他早3.5s的數據肯定也都到達了,這個是需要根據經驗推算的,加入D到達以后有到達了一個E,event-time=6,但是由于0~10的時間窗口已經開始計算了,所以E就丟了。

3.看一個代碼的實際例子

下面代碼中的BoundedOutOfOrdernessGenerator就是一個典型的Watermarks實例

package xuwei.tech;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.meituan.flink.common.conf.FlinkConf; import com.meituan.flink.common.kafka.MTKafkaConsumer08; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.Date;/*** Created by smile on 14/11/2017. 統計每 10 秒內每種操作有多少個*/ public class EventTimeWindowCount {private static final Logger logger = LoggerFactory.getLogger(EventTimeWindowCount.class);public static void main(String[] args) throws Exception { // 獲取作業名String jobName = FlinkConf.getJobName(args); // 獲取執行環境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置使用 EventTime// 作為時間戳(默認是// ProcessingTime)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 開啟 Checkpoint(每 10 秒保存一次檢查點,模式為 Exactly Once)env.enableCheckpointing(10000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 設置從 Kafka 的 topic// "log.orderlog" 中讀取數據MYKafkaConsumer08 consumer = new MYKafkaConsumer08(jobName);DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema())); // 默認接上次開始消費,以下的寫法(setStartFromLatest)可以從最新開始消費,相應的還有(setStartFromEarliest// 從最舊開始消費)// DataStream<String> stream =// env.addSource(consumer.getInstance("log.orderlog", new// SimpleStringSchema()).setStartFromLatest());DataStream<String> orderAmount = // 將讀入的字符串轉化為 OrderRecord 對象stream.map(new ParseOrderRecord()) // 設置從 OrderRecord 對象中提取時間戳的方式,下文 BoundedOutOfOrdernessGenerator// 類中具體實現該方法.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator()) // 用 OrderRecord 對象的 action// 字段進行分流(相同 action// 的進入相同流,不同 action// 的進入不同流).keyBy("action") // 觸發 10s 的滾動窗口,即每十秒的數據進入同一個窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))) // 將同一窗口的每個 OrderRecord 對象的 count// 字段加起來(其余字段只保留第一個進入該窗口的,后進去的丟棄).sum("count") // 將結果從 OrderRecord 對象轉換為 String,每十萬條輸出一條.flatMap(new ParseResult()); // 如果想每條都輸出來,那就輸得慢一點,每 10 秒輸出一條數據(請將上一行的 flatMap 換成下一行的 map)// .map(new ParseResultSleep());// 輸出結果(然后就可以去 Task Manage 的 Stdout 里面看)// 小數據量測試的時候可以這么寫,正式上線的時候不要這么寫!數據量大建議還是寫到 Kafka Topic 或者其他的下游里面去orderAmount.print();env.execute(jobName);}public static class ParseOrderRecord implements MapFunction<String, OrderRecord> {@Overridepublic OrderRecord map(String s) throws Exception {JSONObject jsonObject = JSON.parseObject(s);long id = jsonObject.getLong("id");int dealId = jsonObject.getInteger("dealid");String action = jsonObject.getString("_mt_action");double amount = jsonObject.getDouble("amount");String timestampString = jsonObject.getString("_mt_datetime"); // 將字符串格式的時間戳解析為 long 類型,單位毫秒SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date timestampDate = simpleDateFormat.parse(timestampString);long timestamp = timestampDate.getTime();return new OrderRecord(id, dealId, action, amount, timestamp);}}public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> {private final long maxOutOfOrderness = 3500; // 3.5 secondsprivate long currentMaxTimestamp;@Overridepublic long extractTimestamp(OrderRecord record, long previousElementTimestamp) { // 將數據中的時間戳字段(long 類型,精確到毫秒)賦給// timestamp 變量,此處是// OrderRecord 的 timestamp// 字段long timestamp = record.timestamp;currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);return timestamp;}@Overridepublic Watermark getCurrentWatermark() { // return the watermark as current highest timestamp minus the// out-of-orderness boundreturn new Watermark(currentMaxTimestamp - maxOutOfOrderness);}}public static class ParseResult implements FlatMapFunction<OrderRecord, String> {private static long msgCount = 0;@Overridepublic void flatMap(OrderRecord record, Collector<String> out) throws Exception { // 每十萬條輸出一條,防止輸出太多在 Task// Manage 的 Stdout 里面刷新不出來if (msgCount == 0) {out.collect("Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp)+ " action: " + record.action + " count = " + record.count);msgCount = 0;}msgCount++;msgCount %= 100000;}}public static class ParseResultSleep implements MapFunction<OrderRecord, String> {@Overridepublic String map(OrderRecord record) throws Exception { // 每 10 秒輸出一條數據,防止輸出太多在 Task Manage 的 Stdout 里面刷新不出來// 正式上線的時候不要這么寫!Thread.sleep(10000);return "Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: "+ record.action + " count = " + record.count;}}public static class OrderRecord { public long id; public int dealId; public String action; public double amount; public long timestamp; public long count; public OrderRecord() {} public OrderRecord(long id, int dealId, String action, double amount, long timestamp) { this.id = id; this.dealId = dealId; this.action = action; this.amount = amount; this.timestamp = timestamp; this.count = 1;}} }


?

轉載于:https://my.oschina.net/xiaominmin/blog/3057628

總結

以上是生活随笔為你收集整理的初学Flink,对Watermarks的一些理解和感悟(透彻2)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 蜜臀99久久精品久久久久小说 | 国产无码精品在线播放 | 国产热99 | 亚洲痴女| 国产精品嫩 | 成人国产精品入口免费视频 | 亚洲人精品 | 婷婷伊人综合中文字幕 | 三级免费黄录像 | 欧美高清hd | 黄色成人在线网站 | 一区二区三区视频播放 | 60分钟| 操日本老妇 | 亚洲国产97在线精品一区 | 视频免费在线观看 | 日本欧美www | 国产极品美女高潮无套在线观看 | 又色又爽又黄18网站 | 亚欧洲精品在线视频 | a一级视频| 337p嫩模大胆色肉噜噜噜 | 亚洲成人va| 日韩欧美视频在线 | 国产一级片黄色 | 白浆在线 | 九九热精品在线视频 | 一区二区乱子伦在线播放 | 蜜桃视频成人在线观看 | 麻豆av一区 | 国产在线一区二区三区 | 亚洲综合一区二区三区 | 玖玖精品视频 | 欧美天天爽 | 欧美日韩一二三四区 | 久久午夜伦理 | 裸体女视频 | 麻豆tube | 国产原创一区 | 亚洲动漫精品 | 亚洲区视频在线观看 | 天堂精品一区二区三区 | 欧美精品免费一区二区 | 色八戒av | 一区二区三区高清在线 | 手机在线看片1024 | 曰韩毛片 | 欧美一区二区三区激情 | 亚洲国产一区二区在线 | 亚洲精品视 | 人禽高h交 | 亚洲黄色一级大片 | 最新免费av | 亚洲高潮无码久久 | 天堂免费在线视频 | www.4虎| 美女黄页网站 | 免费看欧美片 | 免费av资源 | 天天摸天天舔 | 久久69| 欧美色悠悠 | 极品在线播放 | 蜜桃精品视频在线 | www伊人 | 成人看片网站 | 色吧久久 | 中文字幕无码精品亚洲35 | 免费成人黄色网址 | 亚洲男人天堂网址 | 成人h动漫精品一区二 | 国产精品欧美一区二区 | 亚州av成人| 少妇太紧太爽又黄又硬又爽小说 | 日本一区二区视频免费 | chinese xxxx videos andvr | 激情五月色播五月 | 97人人艹 | 国产情趣视频 | 91a视频| 日韩欧美一级大片 | 国产精品午夜电影 | 久久综合社区 | av一区在线播放 | 日韩黄色大片 | 国产成人一区在线观看 | 日本精品在线播放 | 在线免费三级 | 先锋影音av资源在线 | 乌克兰av在线 | 亚洲色图激情小说 | av免费入口 | 日韩精品在线观看一区二区三区 | 亚洲制服在线观看 | 日韩美女福利视频 | 女仆裸体打屁屁羞羞免费 | 久久久999国产精品 天堂av中文在线 | 91麻豆精品国产91久久久久久 | 狠狠干欧美 |