Flink EventTime和Watermarks原理结合代码分析(转载+解决+精简记录)
Apache Flink 框架保證Watermark單調(diào)遞增,算子接收到一個Watermark時候,框架知道不會再有任何小于該Watermark的時間戳的數(shù)據(jù)元素到來了,所以Watermark可以看做是告訴Apache Flink框架數(shù)據(jù)流已經(jīng)處理到什么位置(時間維度)的方式。[3]
?
這個博客整來自[1]的內(nèi)容
①代碼
②輸入數(shù)據(jù)如下:
nc -lk 3456
(下面的表格在復(fù)習(xí)溫故的時候記得從①②③④⑤的順序來看,數(shù)據(jù)->時間戳->水位線->水位線和窗口的比較->決定是否觸發(fā))
| 輸入數(shù)據(jù)① | 輸入數(shù)據(jù)的對應(yīng)時間② | 窗口區(qū)間④ [window_start_time, window_end_time) | WaterMark③ | 備注⑤ |
| 0001,1538359882000 | 2018-10-01 10:11:22 | ? | 2018-10-01 10:11:12.000 | ? |
| 0001,1538359886000 | 2018-10-01 10:11:26 | ? | 2018-10-01 10:11:16.000 | 因為有亂序設(shè)置,所以有10s差距 |
| 0001,1538359892000 | 2018-10-01 10:11:32 | ? | 2018-10-01 10:11:22.000 | ? |
| 0001,1538359893000 | 2018-10-01 10:11:33 | ? | 2018-10-01 10:11:23.000 | ? |
| 0001,1538359894000 | 2018-10-01 10:11:34 | 2018-10-01 10:11:22.000(輸入數(shù)據(jù)的時間戳) ? 2018-10-01 10:11:24.000 | 2018-10-01 10:11:24.000 遲到數(shù)據(jù)可以理解為,我本來想要34的水位線,但是有10s左右的遲到數(shù)據(jù),所以實際水位線是24 | 觸發(fā)waterMark計算 10:11:34-10=10:11:24 ? final Long maxOutOfOrderness = 10000L; |
| 0001,1538359896000 | 2018-10-01 10:11:36 | ? | 2018-10-01 10:11:26.000 | ? |
| 0001,1538359897000 | 2018-10-01 10:11:37 | 2018-10-01 10:11:24.000 2018-10-01 10:11:27.000 | 2018-10-01 10:11:27.000 | 觸發(fā)waterMark計算 |
| 0001,1538359899000 | 2018-10-01 10:11:39 | ? | 2018-10-01 10:11:29.000 | ? |
| 0001,1538359891000(亂序數(shù)據(jù)) | 2018-10-01 10:11:31 | ? | 2018-10-01 10:11:29.000 | ? |
| 0001,1538359903000(亂序數(shù)據(jù)) | 2018-10-01 10:11:43 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發(fā)waterMark計算 |
| 0001,1538359890000(延遲數(shù)據(jù)) | 2018-10-01 10:11:30 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發(fā)waterMark計算 |
| 0001,1538359903000(延遲數(shù)據(jù)) | 2018-10-01 10:11:43 | ? | 2018-10-01 10:11:33.000 | ? |
| 0001,1538359891000(延遲數(shù)據(jù)) | 2018-10-01 10:11:31 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發(fā)waterMark計算 |
| 0001,1538359892000(延遲數(shù)據(jù)) | 2018-10-01 10:11:32 | 2018-10-01 10:11:30.000 2018-10-01 10:11:33.000 | 2018-10-01 10:11:33.000 | 觸發(fā)waterMark計算 |
讀表格中的解釋:
1.由于亂序時間maxOutOfOrderness的設(shè)置,導(dǎo)致waterMark的數(shù)值落后于最新且最大的數(shù)據(jù)大約10s
2.延遲數(shù)據(jù)/遲到數(shù)據(jù) 指的是時間戳小于水位線,但是比前一條數(shù)據(jù)來得晚的被接收的。
3.上面表格的第三列就是[window_start_time,window_end_time)
4.如果window大小是3秒,對應(yīng)代碼是:
TumblingEventTimeWindows.of(Time.seconds(3))
那么1分鐘的區(qū)間內(nèi),會把window劃分為如下的形式【左閉右開】
[window_start_time,window_end_time] [00:00:00,00:00:03) [00:00:03,00:00:06) [00:00:06,00:00:09) [00:00:09,00:00:12) [00:00:12,00:00:15) [00:00:15,00:00:18) [00:00:18,00:00:21) [00:00:21,00:00:24) [00:00:24,00:00:27) [00:00:27,00:00:30) [00:00:30,00:00:33) [00:00:33,00:00:36) [00:00:36,00:00:39) [00:00:39,00:00:42) [00:00:42,00:00:45) [00:00:45,00:00:48) [00:00:48,00:00:51) [00:00:51,00:00:54) [00:00:54,00:00:57) [00:00:57,00:01:00)在滿足:
一,watermark時間 >= 上面的window_end_time,
二,在[window_start_time,window_end_time)中有數(shù)據(jù)存在
才會觸發(fā)WaterMark流計算。
亂序參數(shù)maxOutOfOrderness修改的是水位線
延遲參數(shù)修改的是延遲數(shù)據(jù)
--------------------------------------------------------遲到數(shù)據(jù)處理--------------------------------------------------------------------------------------
①代碼
②對于遲到的數(shù)據(jù),都通過sideOutputLateData保存到了outputTag中
----------------------------------------------------------一圖解千言---------------------------------------------------------------------------------------
對于上面的概念,我畫了一張圖:
這個圖的大意如下:
當(dāng)WaterMark上升至window_end_time的時候,觸發(fā)計算。
EventTime表示數(shù)據(jù)還在水平面(WaterMark)的上方,沒有最終落入水面以下。
所以有WaterMark+maxOutOfOrderness=Event TIme
在觸發(fā)計算的同時,還有一部分數(shù)據(jù)沒有到位,這些數(shù)據(jù)是延遲數(shù)據(jù)/遲到數(shù)據(jù)。
在延遲數(shù)據(jù)傳播的同時,WaterMark也在不斷上升,當(dāng)WaterMark上升至window_end_time+allowedLateness(Time.seconds(2))的時候,即使遲到的數(shù)據(jù)到達,也會被丟棄。
?
延遲數(shù)據(jù)打個比方:
木塊(流數(shù)據(jù)中的水滴)投入水位很淺的臉盆,可以到底部(觸發(fā)計算)。
木塊(流數(shù)據(jù)中的水滴)投入水位很高的臉盆,難以到底部(數(shù)據(jù)丟棄)。
?
?
Reference:
[1]Flink EventTime和Watermarks案例分析
[2]在線工具
[3]Apache Flink 漫談系列(03) - Watermark
總結(jié)
以上是生活随笔為你收集整理的Flink EventTime和Watermarks原理结合代码分析(转载+解决+精简记录)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql怎么查看有没有死锁
- 下一篇: flink中的WaterMark调研和具