flink批处理
4.1 State
4.1.1 state概述
Apache Flink? — Stateful Computations over Data Streams
回顧單詞計數的例子
java
/**
-
單詞計數
result.print();env.execute("WordCount");
*/
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource data = env.socketTextStream(“localhost”, 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
collector.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(“0”)
.sum(1);}
}
輸入
java
hadoop,hadoop
hadoop
hive,hadoop
輸出
java
4> (hadoop,1)
4> (hadoop,2)
4> (hadoop,3)
1> (hive,1)
4> (hadoop,4)
我們會發現,單詞出現的次數有累計的效果。如果沒有狀態的管理,是不會有累計的效果的,所以Flink里面還有state的概念。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-65PVkfSk-1638967401625)(assets/State.png)]
state:一般指一個具體的task/operator的狀態。State可以被記錄,在失敗的情況下數據還可以恢復,Flink中有兩種基本類型的State:Keyed State,Operator State,他們兩種都可以以兩種形式存在:原始狀態(raw state)和托管狀態(managed state)
托管狀態:由Flink框架管理的狀態,我們通常使用的就是這種。
原始狀態:由用戶自行管理狀態具體的數據結構,框架在做checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。通常在DataStream上的狀態推薦使用托管的狀態,當實現一個用戶自定義的operator時,會使用到原始狀態。但是我們工作中一般不常用,所以我們不考慮他。
4.1.2 State類型
Operator State
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-X9EMfww3-1638967401626)(assets/Operator State.png)]
operator state是task級別的state,說白了就是每個task對應一個state
Kafka Connector source中的每個分區(task)都需要記錄消費的topic的partition和offset等信息。
operator state 只有一種托管狀態:
? ValueState
Keyed State
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-YiE94tV1-1638967401627)(assets/Keyed State-1573781173705.png)]
4.1.3 Keyed State的案例演示
ValueState
java
/**
- ValueState :這個狀態為每一個 key 保存一個值
- value() 獲取狀態值
- update() 更新狀態值
- clear() 清除狀態
*/
public class CountWindowAverageWithValueState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
// 用以保存每個 key 出現的次數,以及這個 key 對應的 value 的總值
// managed keyed state
//1. ValueState 保存的是對應的一個 key 的一個狀態值
private ValueState<Tuple2<Long, Long>> countAndSum;
}
/**
-
需求:當接收到的相同 key 的元素個數等于 3 個或者超過 3 個的時候
-
就計算這些元素的 value 的平均值。
-
計算 keyed stream 中每 3 個元素的 value 的平均值
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));// 輸出: //(1,5.0) //(2,3.6666666666666665) dataStreamSource.keyBy(0).flatMap(new CountWindowAverageWithValueState()).print();env.execute("TestStatefulApi");
*/
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}
}
結果輸出:
java
3> (1,5.0)
4> (2,3.6666666666666665)
ListState
java
/**
- ListState :這個狀態為每一個 key 保存集合的值
- get() 獲取狀態值
- add() / addAll() 更新狀態值,將數據放到狀態中
- clear() 清除狀態
*/
public class CountWindowAverageWithListState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
// managed keyed state
//1. ListState 保存的是對應的一個 key 的出現的所有的元素
private ListState<Tuple2<Long, Long>> elementsByKey;
}
/**
-
需求:當接收到的相同 key 的元素個數等于 3 個或者超過 3 個的時候
-
就計算這些元素的 value 的平均值。
-
計算 keyed stream 中每 3 個元素的 value 的平均值
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));// 輸出: //(1,5.0) //(2,3.6666666666666665) dataStreamSource.keyBy(0).flatMap(new CountWindowAverageWithListState()).print();env.execute("TestStatefulApi");
*/
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}
}
結果輸出:
java
3> (1,5.0)
4> (2,3.6666666666666665)
MapState
java
/**
- MapState<K, V> :這個狀態為每一個 key 保存一個 Map 集合
- put() 將對應的 key 的鍵值對放到狀態中
- values() 拿到 MapState 中所有的 value
- clear() 清除狀態
*/
public class CountWindowAverageWithMapState
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Double>> {
// managed keyed state
//1. MapState :key 是一個唯一的值,value 是接收到的相同的 key 對應的 value 的值
private MapState<String, Long> mapState;
}
/**
-
需求:當接收到的相同 key 的元素個數等于 3 個或者超過 3 個的時候
-
就計算這些元素的 value 的平均值。
-
計算 keyed stream 中每 3 個元素的 value 的平均值
DataStreamSource<Tuple2<Long, Long>> dataStreamSource =env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L),Tuple2.of(2L, 4L), Tuple2.of(2L, 2L), Tuple2.of(2L, 5L));// 輸出: //(1,5.0) //(2,3.6666666666666665) dataStreamSource.keyBy(0).flatMap(new CountWindowAverageWithMapState()).print();env.execute("TestStatefulApi");
*/
public class TestKeyedStateMain {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}
}
輸出結果:
4> (2,3.6666666666666665)
3> (1,5.0)
ReducingState
java
/**
- ReducingState :這個狀態為每一個 key 保存一個聚合之后的值
- get() 獲取狀態值
- add() 更新狀態值,將數據放到狀態中
- clear() 清除狀態
*/
public class SumFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
// managed keyed state
// 用于保存每一個 key 對應的 value 的總值
private ReducingState sumState;
}
public class TestKeyedStateMain2 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
}
輸出:
4> (2,4)
4> (2,6)
4> (2,11)
3> (1,3)
3> (1,8)
3> (1,15)
AggregatingState
java
public class ContainsValueFunction
extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
}
public class TestKeyedStateMain2 {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
}
輸出:
4> (2,Contains:4)
3> (1,Contains:3)
3> (1,Contains:3 and 5)
3> (1,Contains:3 and 5 and 7)
4> (2,Contains:4 and 2)
4> (2,Contains:4 and 2 and 5)
4.1.4 Operator State案例演示
ListState
java
public class CustomSink
implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction {
}
/**
-
需求: 每兩條數據打印一次結果
DataStreamSource<Tuple2<String, Integer>> dataStreamSource =env.fromElements(Tuple2.of("Spark", 3), Tuple2.of("Hadoop", 5), Tuple2.of("Hadoop", 7),Tuple2.of("Spark", 4));// 輸出://(1,5.0)//(2,3.6666666666666665)dataStreamSource.addSink(new CustomSink(2)).setParallelism(1);env.execute("TestStatefulApi");
*/
public class TestOperatorStateMain {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();}
}
輸出結果:
自定義格式:[(Spark,3), (Hadoop,5)]
自定義格式:[(Hadoop,7), (Spark,4)]
4.1.5 KeyedState 案例演示
需求:將兩個流中,訂單號一樣的數據合并在一起輸出
orderinfo1數據
java
123,拖把,30.0
234,牙膏,20.0
345,被子,114.4
333,杯子,112.2
444,Mac電腦,30000.0
orderinfo2數據
java
123,2019-11-11 10:11:12,江蘇
234,2019-11-11 11:11:13,云南
345,2019-11-11 12:11:14,安徽
333,2019-11-11 13:11:15,北京
444,2019-11-11 14:11:16,深圳
代碼實現:
java
public class Constants {
public static final String ORDER_INFO1_PATH=“D:\kkb\flinklesson\src\main\input\OrderInfo1.txt”;
public static final String ORDER_INFO2_PATH=“D:\kkb\flinklesson\src\main\input\OrderInfo2.txt”;
}
java
public class OrderInfo1 {
//訂單ID
private Long orderId;
//商品名稱
private String productName;
//價格
private Double price;
public OrderInfo1(){
}
public OrderInfo1(Long orderId,String productName,Double price){
this.orderId=orderId;
this.productName=productName;
this.price=price;
}
}
java
public class OrderInfo2 {
//訂單ID
private Long orderId;
//下單時間
private String orderDate;
//下單地址
private String address;
}
java
/**
-
自定義source
*/
public class FileSource implements SourceFunction {
//文件路徑
public String filePath;
public FileSource(String filePath){
this.filePath = filePath;
}private InputStream inputStream;
private BufferedReader reader;private Random random = new Random();
@Override
reader = new BufferedReader(new InputStreamReader(new FileInputStream(filePath)));String line = null;while ((line = reader.readLine()) != null) {// 模擬發送數據TimeUnit.MILLISECONDS.sleep(random.nextInt(500));// 發送數據ctx.collect(line);}if(reader != null){reader.close();}if(inputStream != null){inputStream.close();}
public void run(SourceContext ctx) throws Exception {}
@Override
public void cancel() {
try{
if(reader != null){
reader.close();
}
if(inputStream != null){
inputStream.close();
}
}catch (Exception e){}
}
}
java
public class OrderStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource info1 = env.addSource(new FileSource(Constants.ORDER_INFO1_PATH));
DataStreamSource info2 = env.addSource(new FileSource(Constants.ORDER_INFO2_PATH));
}
4.2 State backend
4.2.1 概述
Flink支持的StateBackend:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
4.2.2 MemoryStateBackend
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-xTvcXHhA-1638967401629)(assets/MemoryStateBackend.png)]
默認情況下,狀態信息是存儲在 TaskManager 的堆內存中的,c heckpoint 的時候將狀態保存到 JobManager 的堆內存中。
缺點:
? 只能保存數據量小的狀態
? 狀態數據有可能會丟失
優點:
? 開發測試很方便
4.2.3 FSStateBackend
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-myydtQDw-1638967401629)(assets/FSStateBackend.png)]
狀態信息存儲在 TaskManager 的堆內存中的,checkpoint 的時候將狀態保存到指定的文件中 (HDFS 等文件系統)
缺點:
狀態大小受TaskManager內存限制(默認支持5M)
優點:
狀態訪問速度很快
狀態信息不會丟失
用于: 生產,也可存儲狀態數據量大的情況
4.2.4 RocksDBStateBackend
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-6nzCCeQP-1638967401630)(assets/RocksDBStateBackend.png)]
狀態信息存儲在 RocksDB 數據庫 (key-value 的數據存儲服務), 最終保存在本地文件中
checkpoint 的時候將狀態保存到指定的文件中 (HDFS 等文件系統)
缺點:
狀態訪問速度有所下降
優點:
可以存儲超大量的狀態信息
狀態信息不會丟失
用于: 生產,可以存儲超大量的狀態信息
4.2.5 StateBackend配置方式
(1)單任務調整
修改當前任務代碼
env.setStateBackend(new FsStateBackend(“hdfs://namenode:9000/flink/checkpoints”));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
(2)全局調整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
4.3 checkpoint
4.3.1 checkpoint概述
(1)為了保證state的容錯性,Flink需要對state進行checkpoint。
(2)Checkpoint是Flink實現容錯機制最核心的功能,它能夠根據配置周期性地基于Stream中各個Operator/task的狀態來生成快照,從而將這些狀態數據定期持久化存儲下來,當Flink程序一旦意外崩潰時,重新運行程序時可以有選擇地從這些快照進行恢復,從而修正因為故障帶來的程序數據異常
(3)Flink的checkpoint機制可以與(stream和state)的持久化存儲交互的前提:
持久化的source,它需要支持在一定時間內重放事件。這種sources的典型例子是持久化的消息隊列(比如Apache Kafka,RabbitMQ等)或文件系統(比如HDFS,S3,GFS等)
用于state的持久化存儲,例如分布式文件系統(比如HDFS,S3,GFS等)
生成快照
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ISVTlIgu-1638891956547)(assets/1569326195474.png)]
恢復快照
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-VwITyOAS-1638891956548)(assets/1569326229867.png)]
4.3.2 checkpoint配置
默認checkpoint功能是disabled的,想要使用的時候需要先啟用,checkpoint開啟之后,checkPointMode有兩種,Exactly-once和At-least-once,默認的checkPointMode是Exactly-once,Exactly-once對于大多數應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲為幾毫秒)。
默認checkpoint功能是disabled的,想要使用的時候需要先啟用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每隔1000 ms進行啟動一個檢查點【設置checkpoint的周期】
env.enableCheckpointing(1000);
// 高級選項:
// 設置模式為exactly-once (這是默認值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時間只允許進行一個檢查點
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 表示一旦Flink處理程序被cancel后,會保留Checkpoint數據,以便根據實際需要恢復到指定的Checkpoint【詳細解釋見備注】
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
4.5 恢復數據
4.5.1 重啟策略概述
Flink支持不同的重啟策略,以在故障發生時控制作業如何重啟,集群在啟動時會伴隨一個默認的重啟策略,在沒有定義具體重啟策略時會使用該默認策略。 如果在工作提交時指定了一個重啟策略,該策略會覆蓋集群的默認策略,默認的重啟策略可以通過 Flink 的配置文件 flink-conf.yaml 指定。配置參數 restart-strategy 定義了哪個策略被使用。
常用的重啟策略
(1)固定間隔 (Fixed delay)
(2)失敗率 (Failure rate)
(3)無重啟 (No restart)
如果沒有啟用 checkpointing,則使用無重啟 (no restart) 策略。
如果啟用了 checkpointing,但沒有配置重啟策略,則使用固定間隔 (fixed-delay) 策略, 嘗試重啟次數默認值是:Integer.MAX_VALUE,重啟策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在應用代碼中動態指定,會覆蓋全局配置。
4.5.2 重啟策略
固定間隔 (Fixed delay)
第一種:全局配置 flink-conf.yaml
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 3
restart-strategy.fixed-delay.delay: 10 s
第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // 嘗試重啟的次數
Time.of(10, TimeUnit.SECONDS) // 間隔
));
失敗率 (Failure rate)
第一種:全局配置 flink-conf.yaml
restart-strategy: failure-rate
restart-strategy.failure-rate.max-failures-per-interval: 3
restart-strategy.failure-rate.failure-rate-interval: 5 min
restart-strategy.failure-rate.delay: 10 s
第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.failureRateRestart(
3, // 一個時間段內的最大失敗次數
Time.of(5, TimeUnit.MINUTES), // 衡量失敗次數的是時間段
Time.of(10, TimeUnit.SECONDS) // 間隔
));
無重啟 (No restart)
第一種:全局配置 flink-conf.yaml
restart-strategy: none
第二種:應用代碼設置
env.setRestartStrategy(RestartStrategies.noRestart());
4.5.3 多checkpoint
默認情況下,如果設置了Checkpoint選項,則Flink只保留最近成功生成的1個Checkpoint,而當Flink程序失敗時,可以從最近的這個Checkpoint來進行恢復。但是,如果我們希望保留多個Checkpoint,并能夠根據實際需要選擇其中一個進行恢復,這樣會更加靈活,比如,我們發現最近4個小時數據記錄處理有問題,希望將整個狀態還原到4小時之前Flink可以支持保留多個Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數:
state.checkpoints.num-retained: 20
這樣設置以后就查看對應的Checkpoint在HDFS上存儲的文件目錄
hdfs dfs -ls hdfs://namenode:9000/flink/checkpoints
如果希望回退到某個Checkpoint點,只需要指定對應的某個Checkpoint路徑即可實現
4.5.4 從checkpoint恢復數據
如果Flink程序異常失敗,或者最近一段時間內數據處理錯誤,我們可以將程序從某一個Checkpoint點進行恢復
bin/flink run -s hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar
程序正常運行后,還會按照Checkpoint配置進行運行,繼續生成Checkpoint數據。
當然恢復數據的方式還可以在自己的代碼里面指定checkpoint目錄,這樣下一次啟動的時候即使代碼發生了改變就自動恢復數據了。
4.5.5 savepoint
Flink通過Savepoint功能可以做到程序升級后,繼續從升級前的那個點開始執行計算,保證數據不中斷
全局,一致性快照。可以保存數據源offset,operator操作狀態等信息,可以從應用在過去任意做了savepoint的時刻開始繼續消費
checkPoint vs savePoint
checkPoint
應用定時觸發,用于保存狀態,會過期,內部應用失敗重啟的時候使用。
savePoint
用戶手動執行,是指向Checkpoint的指針,不會過期,在升級的情況下使用。
注意:為了能夠在作業的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦程序員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 將用于確定每一個算子的狀態范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。只要這些 ID 沒有改變就能從保存點(savepoint)將程序恢復回來。而這些自動生成的 ID 依賴于程序的結構,并且對代碼的更改是很敏感的。因此,強烈建議用戶手動的設置 ID。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-b6RbHcFt-1638891956549)(…/…/…/…/…/%E6%A1%8C%E9%9D%A2/%E5%A4%A7%E6%95%B0%E6%8D%AE%E8%AF%BE%E7%A8%8B/%E8%AF%BE%E7%A8%8B%E8%AE%BE%E8%AE%A1/Flink%E8%AF%BE%E7%A8%8B/assets/1569329267892.png)]
savepoint的使用
1:在flink-conf.yaml中配置Savepoint存儲位置
不是必須設置,但是設置后,后面創建指定Job的Savepoint時,可以不用在手動執行命令時指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:觸發一個savepoint【直接觸發或者在cancel的時候觸發】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數】
3:從指定的savepoint啟動job
bin/flink run -s savepointPath [runArgs]
4.1 需求背景
需求描述:每隔5秒,計算最近10秒單詞出現的次數。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-FAvsBN2d-1638891977211)(assets/Window.png)]
4.1.1 TimeWindow實現
java
/**
-
每隔5秒計算最近10秒單詞出現的次數
result.print().setParallelism(1);env.execute("TimeWindowWordCount");
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“localhost”, 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.sum(1);}
}
4.1.2 ProcessWindowFunction
java
/**
-
每隔5秒計算最近10秒單詞出現的次數
result.print().setParallelism(1);env.execute("TimeWindowWordCount");
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“10.148.15.10”, 8888);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction());}
/**
-
IN, OUT, KEY, W
-
IN:輸入的數據類型
-
OUT:輸出的數據類型
-
Key:key的數據類型(在Flink里面,String用Tuple表示)
-
W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow> {
FastDateFormat dataFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*-
當一個window觸發計算的時候會調用這個方法
-
@param tuple key
-
@param context operator的上下文
-
@param elements 指定window的所有元素
-
@param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements,
Collector<Tuple2<String, Integer>> out) {System.out.println(“當天系統的時間:”+dataFormat.format(System.currentTimeMillis()));
System.out.println(“Window的處理時間:”+dataFormat.format(context.currentProcessingTime()));
System.out.println(“Window的開始時間:”+dataFormat.format(context.window().getStart()));
System.out.println(“Window的結束時間:”+dataFormat.format(context.window().getEnd()));int sum = 0;
for (Tuple2<String, Integer> ele : elements) {
sum += 1;
}
// 輸出單詞出現的次數
out.collect(Tuple2.of(tuple.getField(0), sum));
}
}
} -
-
先輸入:
hive
然后輸入hive,hbase
輸出結果:
java
當天系統的時間:15:10:30
Window的處理時間:15:10:30
Window的開始時間:15:10:20
Window的結束時間:15:10:30
(hive,1)
當天系統的時間:15:10:35
Window的處理時間:15:10:35
Window的開始時間:15:10:25
Window的結束時間:15:10:35
當天系統的時間:15:10:35
Window的處理時間:15:10:35
Window的開始時間:15:10:25
Window的結束時間:15:10:35
(hbase,1)
(hive,1)
根據每隔5秒執行最近10秒的數據,Flink劃分的窗口
java
[00:00:00, 00:00:05) [00:00:05, 00:00:10)
[00:00:10, 00:00:15) [00:00:15, 00:00:20)
[00:00:20, 00:00:25) [00:00:25, 00:00:30)
[00:00:30, 00:00:35) [00:00:35, 00:00:40)
[00:00:40, 00:00:45) [00:00:45, 00:00:50)
[00:00:50, 00:00:55) [00:00:55, 00:01:00)
[00:01:00, 00:01:05) …
4.1.3 Time的種類
針對stream數據中的時間,可以分為以下三種:
Event Time:事件產生的時間,它通常由事件中的時間戳描述。
Ingestion time:事件進入Flink的時間
Processing Time:事件被處理時當前系統的時間
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-rUH0NoYC-1638891977212)(assets/1569394563906.png)]
案例演示:
原始日志如下
2018-10-10 10:00:01,134 INFO executor.Executor: Finished task in state 0.0
這條數據進入Flink的時間是2018-10-10 20:00:00,102
到達window處理的時間為2018-10-10 20:00:01,100
2018-10-10 10:00:01,134 是Event time
2018-10-10 20:00:00,102 是Ingestion time
2018-10-10 20:00:01,100 是Processing tim
思考:
如果我們想要統計每分鐘內接口調用失敗的錯誤日志個數,使用哪個時間才有意義?
4.2 Process Time Window(有序)
需求:每隔5秒計算最近10秒的單詞出現的次數
自定義source,模擬:第 13 秒的時候連續發送 2 個事件,第 16 秒的時候再發送 1 個事件
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-O7XTh0eO-1638891977212)(assets/自定義source-TimeWindow.png)]
java
/**
-
每隔5秒計算最近10秒單詞出現的次數
result.print().setParallelism(1);env.execute("TimeWindowWordCount");
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource dataStream = env.addSource(new TestSouce());
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction());}
public static class TestSouce implements SourceFunction{
}@Overridepublic void cancel() {}
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
@Override
public void run(SourceContext ctx) throws Exception {
// 控制大約在 10 秒的倍數的時間點發送事件
String currTime = String.valueOf(System.currentTimeMillis());
while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {
currTime = String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println(“開始發送事件的時間:” + dateFormat.format(System.currentTimeMillis()));
// 第 13 秒發送兩個事件
TimeUnit.SECONDS.sleep(13);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 產生了一個事件,但是由于網絡原因,事件沒有發送
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 第 16 秒發送一個事件
TimeUnit.SECONDS.sleep(3);
ctx.collect(“hadoop,” + System.currentTimeMillis());
TimeUnit.SECONDS.sleep(300);}
/**
- IN, OUT, KEY, W
- IN:輸入的數據類型
- OUT:輸出的數據類型
- Key:key的數據類型(在Flink里面,String用Tuple表示)
- W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*- 當一個window觸發計算的時候會調用這個方法
- @param tuple key
- @param context operator的上下文
- @param elements 指定window的所有元素
- @param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements,
Collector<Tuple2<String, Integer>> out) {
// System.out.println(“當天系統的時間:”+dateFormat.format(System.currentTimeMillis()));
//
// System.out.println(“Window的處理時間:”+dateFormat.format(context.currentProcessingTime()));
// System.out.println(“Window的開始時間:”+dateFormat.format(context.window().getStart()));
// System.out.println(“Window的結束時間:”+dateFormat.format(context.window().getEnd()));
}
輸出結果:
java
開始發送事件的時間:16:16:40
(hadoop,2)
(1573287413001,1)
(1573287413015,1)
(hadoop,3)
(1573287416016,1)
(1573287413001,1)
(1573287413015,1)
(hadoop,1)
(1573287416016,1)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-0TvpUw7B-1638891977213)(assets/自定義source-TimeWindow2.png)]
4.3 Process Time Window(無序)
自定義source,模擬:第 13 秒的時候連續發送 2 個事件,但是有一個事件確實在第13秒的發送出去了,另外一個事件因為某種原因在19秒的時候才發送出去,第 16 秒的時候再發送 1 個事件
java
/**
-
每隔5秒計算最近10秒單詞出現的次數
result.print().setParallelism(1);env.execute("TimeWindowWordCount");
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource dataStream = env.addSource(new TestSouce());
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new Tuple2<>(word, 1));
}
}
}).keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction());}
/**
-
模擬:第 13 秒的時候連續發送 2 個事件,第 16 秒的時候再發送 1 個事件
TimeUnit.SECONDS.sleep(300);
*/
public static class TestSouce implements SourceFunction{
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
@Override
public void run(SourceContext ctx) throws Exception {
// 控制大約在 10 秒的倍數的時間點發送事件
String currTime = String.valueOf(System.currentTimeMillis());
while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {
currTime = String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println(“開始發送事件的時間:” + dateFormat.format(System.currentTimeMillis()));
// 第 13 秒發送兩個事件
TimeUnit.SECONDS.sleep(13);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 產生了一個事件,但是由于網絡原因,事件沒有發送
String event = “hadoop,” + System.currentTimeMillis();
// 第 16 秒發送一個事件
TimeUnit.SECONDS.sleep(3);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 第 19 秒的時候發送
TimeUnit.SECONDS.sleep(3);
ctx.collect(event);}
@Override
public void cancel() {}
}
/**
- IN, OUT, KEY, W
- IN:輸入的數據類型
- OUT:輸出的數據類型
- Key:key的數據類型(在Flink里面,String用Tuple表示)
- W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Integer>,Tuple2<String,Integer>,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*- 當一個window觸發計算的時候會調用這個方法
- @param tuple key
- @param context operator的上下文
- @param elements 指定window的所有元素
- @param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Integer>> elements,
Collector<Tuple2<String, Integer>> out) {
-
// System.out.println(“當天系統的時間:”+dateFormat.format(System.currentTimeMillis()));
//
// System.out.println(“Window的處理時間:”+dateFormat.format(context.currentProcessingTime()));
// System.out.println(“Window的開始時間:”+dateFormat.format(context.window().getStart()));
// System.out.println(“Window的結束時間:”+dateFormat.format(context.window().getEnd()));
}
處理結果:
java
開始發送事件的時間:16:18:50
(hadoop,1)
(1573287543001,1)
(1573287543001,1)
(hadoop,3)
(1573287546016,1)
(1573287543016,1)
(1573287546016,1)
(hadoop,2)
(1573287543016,1)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-aPREgUnM-1638891977214)(assets/自定義source-TimeWindow-無序.png)]
4.4 使用Event Time處理無序
使用Event Time處理
java
/**
-
每隔5秒計算最近10秒單詞出現的次數
env.execute("TimeWindowWordCount");
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步驟一:設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource dataStream = env.addSource(new TestSouce());
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步驟二:獲取數據里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);}
public static class TestSouce implements SourceFunction{
TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
@Override
public void run(SourceContext ctx) throws Exception {
// 控制大約在 10 秒的倍數的時間點發送事件
String currTime = String.valueOf(System.currentTimeMillis());
while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {
currTime = String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println(“開始發送事件的時間:” + dateFormat.format(System.currentTimeMillis()));
// 第 13 秒發送兩個事件
TimeUnit.SECONDS.sleep(13);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 產生了一個事件,但是由于網絡原因,事件沒有發送
String event = “hadoop,” + System.currentTimeMillis();
// 第 16 秒發送一個事件
TimeUnit.SECONDS.sleep(3);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 第 19 秒的時候發送
TimeUnit.SECONDS.sleep(3);
ctx.collect(event);}
/**
- IN, OUT, KEY, W
- IN:輸入的數據類型
- OUT:輸出的數據類型
- Key:key的數據類型(在Flink里面,String用Tuple表示)
- W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*- 當一個window觸發計算的時候會調用這個方法
- @param tuple key
- @param context operator的上下文
- @param elements 指定window的所有元素
- @param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Integer>> out) {
// System.out.println(“當天系統的時間:”+dateFormat.format(System.currentTimeMillis()));
//
// System.out.println(“Window的處理時間:”+dateFormat.format(context.currentProcessingTime()));
// System.out.println(“Window的開始時間:”+dateFormat.format(context.window().getStart()));
// System.out.println(“Window的結束時間:”+dateFormat.format(context.window().getEnd()));
}
計算結果:
java
開始發送事件的時間:16:44:10
(hadoop,1)
(hadoop,3)
(hadoop,1)
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-6uVn92Pf-1638891977214)(assets/用EventTime處理無序的數據.png)]
現在我們第三個window的結果已經計算準確了,但是我們還是沒有徹底的解決問題。接下來就需要我們使用WaterMark機制來解決了。
4.5 使用WaterMark機制解決無序
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-A3qdpC9T-1638891977215)(assets/使用waterMark機制處理無序的數據.png)]
java
/**
-
每隔5秒計算最近10秒單詞出現的次數
env.execute("TimeWindowWordCount");
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步驟一:設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource dataStream = env.addSource(new TestSouce());
dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {
@Override
public Tuple2<String, Long> map(String line) throws Exception {
String[] fields = line.split(",");
return new Tuple2<>(fields[0],Long.valueOf(fields[1]));
}
//步驟二:獲取數據里面的event Time
}).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(10), Time.seconds(5))
.process(new SumProcessWindowFunction())
.print().setParallelism(1);}
public static class TestSouce implements SourceFunction{
TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
@Override
public void run(SourceContext ctx) throws Exception {
// 控制大約在 10 秒的倍數的時間點發送事件
String currTime = String.valueOf(System.currentTimeMillis());
while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {
currTime = String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println(“開始發送事件的時間:” + dateFormat.format(System.currentTimeMillis()));
// 第 13 秒發送兩個事件
TimeUnit.SECONDS.sleep(13);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 產生了一個事件,但是由于網絡原因,事件沒有發送
String event = “hadoop,” + System.currentTimeMillis();
// 第 16 秒發送一個事件
TimeUnit.SECONDS.sleep(3);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 第 19 秒的時候發送
TimeUnit.SECONDS.sleep(3);
ctx.collect(event);}
/**
- IN, OUT, KEY, W
- IN:輸入的數據類型
- OUT:輸出的數據類型
- Key:key的數據類型(在Flink里面,String用Tuple表示)
- W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*- 當一個window觸發計算的時候會調用這個方法
- @param tuple key
- @param context operator的上下文
- @param elements 指定window的所有元素
- @param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Integer>> out) {
// System.out.println(“當天系統的時間:”+dateFormat.format(System.currentTimeMillis()));
//
// System.out.println(“Window的處理時間:”+dateFormat.format(context.currentProcessingTime()));
// System.out.println(“Window的開始時間:”+dateFormat.format(context.window().getStart()));
// System.out.println(“Window的結束時間:”+dateFormat.format(context.window().getEnd()));
}
計算結果:
java
開始發送事件的時間:16:57:40
(hadoop,2)
(hadoop,3)
(hadoop,1)
結果正確!
4.6 WaterMark機制
4.6.1 WaterMark的周期
java
/**
-
每隔5秒計算最近10秒單詞出現的次數
DataStreamSource<String> dataStream = env.addSource(new TestSouce()); dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0],Long.valueOf(fields[1]));}//步驟二:獲取數據里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5)).process(new SumProcessWindowFunction()).print().setParallelism(1);env.execute("TimeWindowWordCount");
*/
public class TimeWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步驟一:設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置waterMark產生的周期為1s
env.getConfig().setAutoWatermarkInterval(1000);}
public static class TestSouce implements SourceFunction{
TimeUnit.SECONDS.sleep(300);}@Overridepublic void cancel() {}
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
@Override
public void run(SourceContext ctx) throws Exception {
// 控制大約在 10 秒的倍數的時間點發送事件
String currTime = String.valueOf(System.currentTimeMillis());
while (Integer.valueOf(currTime.substring(currTime.length() - 4)) > 100) {
currTime = String.valueOf(System.currentTimeMillis());
continue;
}
System.out.println(“開始發送事件的時間:” + dateFormat.format(System.currentTimeMillis()));
// 第 13 秒發送兩個事件
TimeUnit.SECONDS.sleep(13);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 產生了一個事件,但是由于網絡原因,事件沒有發送
String event = “hadoop,” + System.currentTimeMillis();
// 第 16 秒發送一個事件
TimeUnit.SECONDS.sleep(3);
ctx.collect(“hadoop,” + System.currentTimeMillis());
// 第 19 秒的時候發送
TimeUnit.SECONDS.sleep(3);
ctx.collect(event);}
/**
-
IN, OUT, KEY, W
-
IN:輸入的數據類型
-
OUT:輸出的數據類型
-
Key:key的數據類型(在Flink里面,String用Tuple表示)
-
W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,Tuple2<String,Integer>,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*- 當一個window觸發計算的時候會調用這個方法
- @param tuple key
- @param context operator的上下文
- @param elements 指定window的所有元素
- @param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Integer>> out) {
int sum = 0;
for (Tuple2<String, Long> ele : elements) {
sum += 1;
}
// 輸出單詞出現的次數
out.collect(Tuple2.of(tuple.getField(0), sum));
}
}
private static class EventTimeExtractor
// 拿到每一個事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {//這個方法是每獲取到一個數據就會被調用一次。return element.f1;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark會周期性的產生,默認就是每隔200毫秒產生一個** 設置 watermark 產生的周期為 1000ms* env.getConfig().setAutoWatermarkInterval(1000);*///window延遲5秒觸發System.out.println("water mark...");return new Watermark(System.currentTimeMillis() - 5000);}
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);}
} -
輸出結果:
java
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
開始發送事件的時間:17:10:50
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
water mark…
(hadoop,2)
water mark…
water mark…
water mark…
water mark…
water mark…
(hadoop,3)
water mark…
water mark…
water mark…
water mark…
water mark…
(hadoop,1)
water mark…
water mark…
water mark…
water mark…
water mark…
4.6.2 WaterMark的定義
使用eventTime的時候如何處理亂序數據?
我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由于網絡延遲等原因,導致亂序的產生,特別是使用kafka的話,多個分區的數據無法保證有序。所以在進行window計算的時候,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了。這個特別的機制,就是watermark,watermark是用于處理亂序事件的。watermark可以翻譯為水位線
有序的流的watermarks
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-DmmLBPo2-1638891977216)(assets/1569479960665.png)]
無序的流的watermarks
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-36aMIYRH-1638891977216)(assets/1569479997521.png)]
多并行度流的watermarks
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-UbjdEbMt-1638891977216)(assets/1569480051217.png)]
4.6.3 需求
得到并打印每隔 3 秒鐘統計前 3 秒內的相同的 key 的所有的事件
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Whk6Sml5-1638891977217)(assets/1573294611566.png)]
代碼開發
java
/**
-
得到并打印每隔 3 秒鐘統計前 3 秒內的相同的 key 的所有的事件
DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);dataStream.map(new MapFunction<String, Tuple2<String,Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0],Long.valueOf(fields[1]));}//步驟二:獲取數據里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor() ).keyBy(0).timeWindow(Time.seconds(3)).process(new SumProcessWindowFunction()).print().setParallelism(1);env.execute("TimeWindowWordCount");
*/
public class WaterMarkWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步驟一:設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置waterMark產生的周期為1s
env.getConfig().setAutoWatermarkInterval(1000);}
/**
-
IN, OUT, KEY, W
-
IN:輸入的數據類型
-
OUT:輸出的數據類型
-
Key:key的數據類型(在Flink里面,String用Tuple表示)
-
W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*-
當一個window觸發計算的時候會調用這個方法
-
@param tuple key
-
@param context operator的上下文
-
@param elements 指定window的所有元素
-
@param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector out) {
System.out.println(“處理時間:” + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + “|” + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
} -
private static class EventTimeExtractor
private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允許的亂序時間 10 秒// 拿到每一個事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);System.out.println("event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark會周期性的產生,默認就是每隔200毫秒產生一個** 設置 watermark 產生的周期為 1000ms* env.getConfig().setAutoWatermarkInterval(1000);*///window延遲5秒觸發System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);}
} -
演示數據:
java
– window 計算觸發的條件
000001,1461756862000
000001,1461756866000
000001,1461756872000
000001,1461756873000
000001,1461756874000
000001,1461756876000
000001,1461756877000
一條一條的數據輸入。
4.6.4計算window的觸發時間
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-IXcFDniX-1638891977217)(assets/1573295426278.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5aHUiD1A-1638891977218)(assets/1573295434967.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-QsuCxo0z-1638891977221)(assets/1573295444736.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-xPnBxVdX-1638891977222)(assets/1573295452688.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5QXLUCLF-1638891977223)(assets/1573295462557.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ds1NpQlp-1638891977223)(assets/1573295482248.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-7UYRCOfq-1638891977224)(assets/1573295499134.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-6QSJBUn2-1638891977224)(assets/1573295512707.png)]
總結:window觸發的時間
4.6.5 WaterMark+Window 處理亂序時間
輸入數據:
java
000001,1461756879000
000001,1461756871000
000001,1461756883000
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-sHt4BtDC-1638891977225)(assets/1573296359405.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EtVWFh0R-1638891977225)(assets/1573296391460.png)]
4.6.5 遲到太多的事件
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-8Z2Fm3TK-1638891977225)(assets/1573296532906.png)]
丟棄
重啟程序,做測試。
輸入數據:
java
000001,1461756870000
000001,1461756883000
000001,1461756870000
000001,1461756871000
000001,1461756872000
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-8f7kJ4eQ-1638891977226)(assets/1573296944424.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-WkmYIWyo-1638891977226)(assets/1573296954680.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-6jcDTAWj-1638891977226)(assets/1573296963406.png)]
發現遲到太多數據就會被丟棄
指定允許再次遲到的時間
java
).assignTimestampsAndWatermarks(new EventTimeExtractor() )
.keyBy(0)
.timeWindow(Time.seconds(3))
.allowedLateness(Time.seconds(2)) // 允許事件遲到 2 秒
.process(new SumProcessWindowFunction())
.print().setParallelism(1);
輸入數據
java
000001,1461756870000
000001,1461756883000
000001,1461756870000
000001,1461756871000
000001,1461756872000
000001,1461756884000
000001,1461756870000
000001,1461756871000
000001,1461756872000
000001,1461756885000
000001,1461756870000
000001,1461756871000
000001,1461756872000
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-K0xMKFcI-1638891977227)(assets/1573297641179.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-eruwWqgF-1638891977227)(assets/1573297653341.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-VxD3s5eq-1638891977227)(assets/1573297664487.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-iNxuS6Jj-1638891977227)(assets/1573297613203.png)]
收集遲到的數據
/**
-
得到并打印每隔 3 秒鐘統計前 3 秒內的相同的 key 的所有的事件
-
收集遲到太多的數據
// 保存遲到的,會被丟棄的數據OutputTag<Tuple2<String, Long>> outputTag =new OutputTag<Tuple2<String, Long>>("late-date"){};DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0], Long.valueOf(fields[1]));}//步驟二:獲取數據里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor()).keyBy(0).timeWindow(Time.seconds(3))// .allowedLateness(Time.seconds(2)) // 允許事件遲到 2 秒.sideOutputLateData(outputTag) // 保存遲到太多的數據.process(new SumProcessWindowFunction());//打印正常的數據result.print();//獲取遲到太多的數據DataStream<String> lateDataStream= result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String, Long>, String>() {@Overridepublic String map(Tuple2<String, Long> stringLongTuple2) throws Exception {return "遲到的數據:" + stringLongTuple2.toString();}});lateDataStream.print();env.execute("TimeWindowWordCount");
*/
public class WaterMarkWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//步驟一:設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置waterMark產生的周期為1s
env.getConfig().setAutoWatermarkInterval(1000);}
/**
-
IN, OUT, KEY, W
-
IN:輸入的數據類型
-
OUT:輸出的數據類型
-
Key:key的數據類型(在Flink里面,String用Tuple表示)
-
W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*-
當一個window觸發計算的時候會調用這個方法
-
@param tuple key
-
@param context operator的上下文
-
@param elements 指定window的所有元素
-
@param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector out) {
System.out.println(“處理時間:” + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + “|” + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
} -
private static class EventTimeExtractor
private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允許的亂序時間 10 秒// 拿到每一個事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);System.out.println("event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark會周期性的產生,默認就是每隔200毫秒產生一個** 設置 watermark 產生的周期為 1000ms* env.getConfig().setAutoWatermarkInterval(1000);*/System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);}
} -
輸入:
java
000001,1461756870000
000001,1461756883000
遲到的數據
000001,1461756870000
000001,1461756871000
000001,1461756872000
4.7 多并行度下的WaterMark
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-FK3BOTJM-1638891977228)(assets/1573298799383.png)]
一個window可能會接受到多個waterMark,我們以最小的為準。
/**
-
得到并打印每隔 3 秒鐘統計前 3 秒內的相同的 key 的所有的事件
-
測試多并行度
// 保存遲到的,會被丟棄的數據OutputTag<Tuple2<String, Long>> outputTag =new OutputTag<Tuple2<String, Long>>("late-date"){};DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0], Long.valueOf(fields[1]));}//步驟二:獲取數據里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor()).keyBy(0).timeWindow(Time.seconds(3))// .allowedLateness(Time.seconds(2)) // 允許事件遲到 2 秒.sideOutputLateData(outputTag) // 保存遲到太多的數據.process(new SumProcessWindowFunction());//打印正常的數據result.print();//獲取遲到太多的數據DataStream<String> lateDataStream= result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String, Long>, String>() {@Overridepublic String map(Tuple2<String, Long> stringLongTuple2) throws Exception {return "遲到的數據:" + stringLongTuple2.toString();}});lateDataStream.print();env.execute("TimeWindowWordCount");
*/
public class WaterMarkWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//把并行度設置為2
env.setParallelism(2);
//步驟一:設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置waterMark產生的周期為1s
env.getConfig().setAutoWatermarkInterval(1000);}
/**
-
IN, OUT, KEY, W
-
IN:輸入的數據類型
-
OUT:輸出的數據類型
-
Key:key的數據類型(在Flink里面,String用Tuple表示)
-
W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*-
當一個window觸發計算的時候會調用這個方法
-
@param tuple key
-
@param context operator的上下文
-
@param elements 指定window的所有元素
-
@param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector out) {
System.out.println(“處理時間:” + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + “|” + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
} -
private static class EventTimeExtractor
private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允許的亂序時間 10 秒// 拿到每一個事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);//打印線程long id = Thread.currentThread().getId();System.out.println("當前線程ID:"+id+"event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark會周期性的產生,默認就是每隔200毫秒產生一個** 設置 watermark 產生的周期為 1000ms* env.getConfig().setAutoWatermarkInterval(1000);*/System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);}
} -
輸入數據:
java
000001,1461756870000
000001,1461756883000
000001,1461756888000
輸出結果:
當前線程ID:55event = (000001,1461756883000)|19:34:43|19:34:43|19:34:33
water mark…
當前線程ID:56event = (000001,1461756870000)|19:34:30|19:34:30|19:34:20
water mark…
water mark…
water mark…
當前線程ID:56event = (000001,1461756888000)|19:34:48|19:34:48|19:34:38
water mark…
water mark…
處理時間:19:31:25
window start time : 19:34:30
2> [(000001,1461756870000)|19:34:30]
window end time : 19:34:33
ID為56的線程有兩個WaterMark:20,38
那么38這個會替代20,所以ID為56的線程的WaterMark是38
然后ID為55的線程的WaterMark是33,而ID為56是WaterMark是38,會在里面求一個小的值作為waterMark,就是33,這個時候會觸發Window為30-33的窗口,那這個窗口里面就有 (000001,1461756870000)這條數據。
4.8 WaterMark生成機制
/**
-
得到并打印每隔 3 秒鐘統計前 3 秒內的相同的 key 的所有的事件
-
有條件的產生watermark
// 保存遲到的,會被丟棄的數據OutputTag<Tuple2<String, Long>> outputTag =new OutputTag<Tuple2<String, Long>>("late-date"){};DataStreamSource<String> dataStream = env.socketTextStream("10.148.15.10", 8888);SingleOutputStreamOperator<String> result = dataStream.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String line) throws Exception {String[] fields = line.split(",");return new Tuple2<>(fields[0], Long.valueOf(fields[1]));}//步驟二:獲取數據里面的event Time}).assignTimestampsAndWatermarks(new EventTimeExtractor()).keyBy(0).timeWindow(Time.seconds(3))// .allowedLateness(Time.seconds(2)) // 允許事件遲到 2 秒.sideOutputLateData(outputTag) // 保存遲到太多的數據.process(new SumProcessWindowFunction());//打印正常的數據result.print();//獲取遲到太多的數據DataStream<String> lateDataStream= result.getSideOutput(outputTag).map(new MapFunction<Tuple2<String, Long>, String>() {@Overridepublic String map(Tuple2<String, Long> stringLongTuple2) throws Exception {return "遲到的數據:" + stringLongTuple2.toString();}});lateDataStream.print();env.execute("TimeWindowWordCount");
*/
public class WaterMarkWindowWordCount {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//把并行度設置為2
env.setParallelism(2);
//步驟一:設置時間類型
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//設置waterMark產生的周期為1s
env.getConfig().setAutoWatermarkInterval(1000);}
/**
-
IN, OUT, KEY, W
-
IN:輸入的數據類型
-
OUT:輸出的數據類型
-
Key:key的數據類型(在Flink里面,String用Tuple表示)
-
W:Window的數據類型
/
public static class SumProcessWindowFunction extends
ProcessWindowFunction<Tuple2<String,Long>,String,Tuple,TimeWindow> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);
/*-
當一個window觸發計算的時候會調用這個方法
-
@param tuple key
-
@param context operator的上下文
-
@param elements 指定window的所有元素
-
@param out 用戶輸出
*/
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<String, Long>> elements,
Collector out) {
System.out.println(“處理時間:” + dateFormat.format(context.currentProcessingTime()));
System.out.println("window start time : " + dateFormat.format(context.window().getStart()));List list = new ArrayList<>();
for (Tuple2<String, Long> ele : elements) {
list.add(ele.toString() + “|” + dateFormat.format(ele.f1));
}
out.collect(list.toString());
System.out.println("window end time : " + dateFormat.format(context.window().getEnd()));
}
} -
/**
-
按條件產生waterMark
*/
private static class EventTimeExtractor2
implements AssignerWithPunctuatedWatermarks<Tuple2<String, Long>> {@Nullable
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement,
long extractedTimestamp) {
// 這個方法是每接收到一個事件就會調用
// 根據條件產生 watermark ,并不是周期性的產生 watermark
if (lastElement.f0 == “000002”) {
// 才發送 watermark
return new Watermark(lastElement.f1 - 10000);
}
// 則表示不產生 watermark
return null;
}@Override
public long extractTimestamp(Tuple2<String, Long> element,
long previousElementTimestamp) {
return element.f1;
}
}
private static class EventTimeExtractor
private long currentMaxEventTime = 0L;private long maxOutOfOrderness = 10000; // 最大允許的亂序時間 10 秒// 拿到每一個事件的 Event Time@Overridepublic long extractTimestamp(Tuple2<String, Long> element,long previousElementTimestamp) {long currentElementEventTime = element.f1;currentMaxEventTime = Math.max(currentMaxEventTime, currentElementEventTime);long id = Thread.currentThread().getId();System.out.println("當前線程ID:"+id+"event = " + element+ "|" + dateFormat.format(element.f1) // Event Time+ "|" + dateFormat.format(currentMaxEventTime) // Max Event Time+ "|" + dateFormat.format(getCurrentWatermark().getTimestamp())); // Current Watermarkreturn currentElementEventTime;}@Nullable@Overridepublic Watermark getCurrentWatermark() {/*** WasterMark會周期性的產生,默認就是每隔200毫秒產生一個** 設置 watermark 產生的周期為 1000ms* env.getConfig().setAutoWatermarkInterval(1000);*** 和事件關系不大* 1. watermark 值依賴處理時間的場景* 2. 當有一段時間沒有接收到事件,但是仍然需要產生 watermark 的場景*/System.out.println("water mark...");return new Watermark(currentMaxEventTime - maxOutOfOrderness);}
implements AssignerWithPeriodicWatermarks<Tuple2<String, Long>> {
FastDateFormat dateFormat = FastDateFormat.getInstance(“HH:mm:ss”);}
} -
4.1 Window概述
聚合事件(比如計數、求和)在流上的工作方式與批處理不同。比如,對流中的所有元素進行計數是不可能的,因為通常流是無限的(無界的)。所以,流上的聚合需要由 window 來劃定范圍,比如 “計算過去的5分鐘” ,或者 “最后100個元素的和” 。window是一種可以把無限數據切割為有限數據塊的手段。
窗口可以是 時間驅動的 【Time Window】(比如:每30秒)或者 數據驅動的【Count Window】 (比如:每100個元素)。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-J2v57Cve-1638891999640)(assets/1569381744120.png)]
4.2 Window類型
窗口通常被區分為不同的類型:
tumbling windows:滾動窗口 【沒有重疊】
sliding windows:滑動窗口 【有重疊】
session windows:會話窗口
global windows: 沒有窗口
4.2.1 tumblingwindows:滾動窗口【沒有重疊】
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-6ZAUzdp9-1638891999641)(assets/1569381903653.png)]
4.2.2 slidingwindows:滑動窗口 【有重疊】
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-LotwBOkR-1638891999642)(assets/1569381981992.png)]
4.2.3 session windows
需求:實時計算每個單詞出現的次數,如果一個單詞過了5秒就沒出現過了,那么就輸出這個單詞。
案例演示:見下方
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-CUbWQprK-1638891999642)(assets/session-windows.svg)]
4.2.4 global windows
案例見下方
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-E6mS6H6J-1638891999643)(assets/non-windowed.svg)]
4.2.5 Window類型總結
Keyed Window 和 Non Keyed Window
/**
-
Non Keyed Window 和 Keyed Window
SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] fields = line.split(",");for (String word : fields) {collector.collect(Tuple2.of(word, 1));}}});//Non keyed Stream
*/
public class WindowType {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“10.148.15.10”, 8888);
// AllWindowedStream<Tuple2<String, Integer>, TimeWindow> nonkeyedStream = stream.timeWindowAll(Time.seconds(3));
// nonkeyedStream.sum(1)
// .print();
}
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-9CTr2fm7-1638891999643)(assets/window的類型2.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-oiNUyTTX-1638891999643)(assets/window的類型-1573884417208.png)]
TimeWindow
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ZSzkj4IG-1638891999644)(assets/1569383737549.png)]
CountWindow
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-RbyitwKC-1638891999644)(assets/1569383745199.png)]
自定義Window
一般前面兩種window就能解決我們所遇到的業務場景了,本人至今還沒遇到需要自定義window的場景。
4.3 window操作
Keyed Windows
stream
.keyBy(…) <- keyed versus non-keyed windows
.window(…) <- required: “assigner”
[.trigger(…)] <- optional: “trigger” (else default trigger)
[.evictor(…)] <- optional: “evictor” (else no evictor)
[.allowedLateness(…)] <- optional: “lateness” (else zero)
[.sideOutputLateData(…)] <- optional: “output tag” (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: “function”
[.getSideOutput(…)] <- optional: “output tag”
Non-Keyed Windows
java
stream
.windowAll(…) <- required: “assigner”
[.trigger(…)] <- optional: “trigger” (else default trigger)
[.evictor(…)] <- optional: “evictor” (else no evictor)
[.allowedLateness(…)] <- optional: “lateness” (else zero)
[.sideOutputLateData(…)] <- optional: “output tag” (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: “function”
[.getSideOutput(…)] <- optional: “output tag”
4.3.1 window function
Tumbling window和slide window
java
//滾動窗口
stream.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.sum(1)
.print();
//滑動窗口
stream.keyBy(0)
.window(SlidingProcessingTimeWindows.of(Time.seconds(6),Time.seconds(4)))
.sum(1)
.print();
session window
java
/**
-
5秒過去以后,該單詞不出現就打印出來該單詞
SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] fields = line.split(",");for (String word : fields) {collector.collect(Tuple2.of(word, 1));}}});stream.keyBy(0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1).print();env.execute("SessionWindowTest");
*/
public class SessionWindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“10.148.15.10”, 8888);}
}
global window
global window + trigger 一起配合才能使用
需求:單詞每出現三次統計一次
java
/**
-
單詞每出現三次統計一次
SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] fields = line.split(",");for (String word : fields) {collector.collect(Tuple2.of(word, 1));}}});stream.keyBy(0).window(GlobalWindows.create())//如果不加這個程序是啟動不起來的.trigger(CountTrigger.of(3)).sum(1).print();env.execute("SessionWindowTest");
*/
public class GlobalWindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“10.148.15.10”, 8888);}
}
執行結果:
java
hello,3
hello,6
hello,9
總結:效果跟CountWindow(3)很像,但又有點不像,因為如果是CountWindow(3),單詞每次出現的都是3次,不會包含之前的次數,而我們剛剛的這個每次都包含了之前的次數。
4.3.2 Trigger
需求:自定義一個CountWindow
java
/**
-
使用Trigger 自己實現一個類似CountWindow的效果
SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] fields = line.split(",");for (String word : fields) {collector.collect(Tuple2.of(word, 1));}}});WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0).window(GlobalWindows.create()).trigger(new MyCountTrigger(3));//可以看看里面的源碼,跟我們寫的很像
*/
public class CountWindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“10.148.15.10”, 8888);
// WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0)
// .window(GlobalWindows.create())
// .trigger(CountTrigger.of(3));
}
注:效果跟CountWindow一模一樣
4.3.3 Evictor
需求:實現每隔2個單詞,計算最近3個單詞
java
/**
-
使用Evictor 自己實現一個類似CountWindow(3,2)的效果
-
每隔2個單詞計算最近3個單詞
SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {String[] fields = line.split(",");for (String word : fields) {collector.collect(Tuple2.of(word, 1));}}});WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0).window(GlobalWindows.create()).trigger(new MyCountTrigger(2)).evictor(new MyCountEvictor(3));DataStream<Tuple2<String, Integer>> wordCounts = keyedWindow.sum(1);wordCounts.print().setParallelism(1);env.execute("Streaming WordCount");
*/
public class CountWindowWordCountByEvictor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“10.148.15.10”, 8888);}
private static class MyCountTrigger
// 用于存儲每個 key 對應的 count 值private ReducingStateDescriptor<Long> stateDescriptor= new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {@Overridepublic Long reduce(Long aLong, Long t1) throws Exception {return aLong + t1;}}, Long.class);public MyCountTrigger(long maxCount) {this.maxCount = maxCount;}/*** 當一個元素進入到一個 window 中的時候就會調用這個方法* @param element 元素* @param timestamp 進來的時間* @param window 元素所屬的窗口* @param ctx 上下文* @return TriggerResult* 1. TriggerResult.CONTINUE :表示對 window 不做任何處理* 2. TriggerResult.FIRE :表示觸發 window 的計算* 3. TriggerResult.PURGE :表示清除 window 中的所有數據* 4. TriggerResult.FIRE_AND_PURGE :表示先觸發 window 計算,然后刪除 window 中的數據* @throws Exception*/@Overridepublic TriggerResult onElement(Tuple2<String, Integer> element,long timestamp,GlobalWindow window,TriggerContext ctx) throws Exception {// 拿到當前 key 對應的 count 狀態值ReducingState<Long> count = ctx.getPartitionedState(stateDescriptor);// count 累加 1count.add(1L);// 如果當前 key 的 count 值等于 maxCountif (count.get() == maxCount) {count.clear();// 觸發 window 計算,刪除數據return TriggerResult.FIRE;}// 否則,對 window 不做任何的處理return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time,GlobalWindow window,TriggerContext ctx) throws Exception {// 寫基于 Processing Time 的定時器任務邏輯return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time,GlobalWindow window,TriggerContext ctx) throws Exception {// 寫基于 Event Time 的定時器任務邏輯return TriggerResult.CONTINUE;}@Overridepublic void clear(GlobalWindow window, TriggerContext ctx) throws Exception {// 清除狀態值ctx.getPartitionedState(stateDescriptor).clear();}
extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
// 表示指定的元素的最大的數量
private long maxCount;}
private static class MyCountEvictor
public MyCountEvictor(long windowCount) {this.windowCount = windowCount;}/*** 在 window 計算之前刪除特定的數據* @param elements window 中所有的元素* @param size window 中所有元素的大小* @param window window* @param evictorContext 上下文*/@Overridepublic void evictBefore(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements,int size, GlobalWindow window, EvictorContext evictorContext) {if (size <= windowCount) {return;} else {int evictorCount = 0;Iterator<TimestampedValue<Tuple2<String, Integer>>> iterator = elements.iterator();while (iterator.hasNext()) {iterator.next();evictorCount++;// 如果刪除的數量小于當前的 window 大小減去規定的 window 的大小,就需要刪除當前的元素if (evictorCount > size - windowCount) {break;} else {iterator.remove();}}}}/*** 在 window 計算之后刪除特定的數據* @param elements window 中所有的元素* @param size window 中所有元素的大小* @param window window* @param evictorContext 上下文*/@Overridepublic void evictAfter(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements,int size, GlobalWindow window, EvictorContext evictorContext) {}
implements Evictor<Tuple2<String, Integer>, GlobalWindow> {
// window 的大小
private long windowCount;}
}
4.3.4 window增量聚合
窗口中每進入一條數據,就進行一次計算,等時間到了展示最后的結果
常用的聚合算子
java
reduce(reduceFunction)
aggregate(aggregateFunction)
sum(),min(),max()
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-YsoVwJU7-1638891999645)(assets/1573871695836.png)]
java
/**
-
演示增量聚合
env.execute(SocketDemoIncrAgg.class.getSimpleName());
*/
public class SocketDemoIncrAgg {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“localhost”, 8888);
SingleOutputStreamOperator intDStream = dataStream.map(number -> Integer.valueOf(number));
AllWindowedStream<Integer, TimeWindow> windowResult = intDStream.timeWindowAll(Time.seconds(10));
windowResult.reduce(new ReduceFunction() {
@Override
public Integer reduce(Integer last, Integer current) throws Exception {
System.out.println(“執行邏輯”+last + " "+current);
return last+current;
}
}).print();}
}
aggregate算子
需求:求每隔窗口里面的數據的平均值
java
/**
-
求每隔窗口中的數據的平均值
SingleOutputStreamOperator<Integer> numberStream = dataStream.map(line -> Integer.valueOf(line));AllWindowedStream<Integer, TimeWindow> windowStream = numberStream.timeWindowAll(Time.seconds(5));windowStream.aggregate(new MyAggregate()).print();env.execute("aggregateWindowTest");
*/
public class aggregateWindowTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“10.148.15.10”, 8888);}
/**
-
IN, 輸入的數據類型
-
ACC,自定義的中間狀態
- Tuple2<Integer,Integer>:
- key: 計算數據的個數
- value:計算總值
-
OUT,輸出的數據類型
/
private static class MyAggregate
implements AggregateFunction<Integer,Tuple2<Integer,Integer>,Double>{
/*- 初始化 累加器
- @return
*/
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return new Tuple2<>(0,0);
}
/**
- 針對每個數據的操作
- @return
*/
@Override
public Tuple2<Integer, Integer> add(Integer element,
Tuple2<Integer, Integer> accumulator) {
//個數+1
//總的值累計
return new Tuple2<>(accumulator.f0+1,accumulator.f1+element);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return (double)accumulator.f1/accumulator.f0;
}@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a1,
Tuple2<Integer, Integer> b1) {
return Tuple2.of(a1.f0+b1.f0,a1.f1+b1.f1);
}
}
}
-
4.3.5 window全量聚合
等屬于窗口的數據到齊,才開始進行聚合計算【可以實現對窗口內的數據進行排序等需求】
java
apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。類似于map和RichMap的關系
效果圖
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ZcqyYNRP-1638891999645)(assets/1573877034053.png)]
java
/**
-
全量計算
env.execute("socketDemoFullAgg");
*/
public class SocketDemoFullAgg {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource dataStream = env.socketTextStream(“localhost”, 8888);
SingleOutputStreamOperator intDStream = dataStream.map(number -> Integer.valueOf(number));
AllWindowedStream<Integer, TimeWindow> windowResult = intDStream.timeWindowAll(Time.seconds(10));
windowResult.process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
@Override
public void process(Context context, Iterable iterable, Collector collector) throws Exception {
System.out.println(“執行計算邏輯”);
int count=0;
Iterator numberiterator = iterable.iterator();
while (numberiterator.hasNext()){
Integer number = numberiterator.next();
count+=number;
}
collector.collect(count);
}
}).print();}
}
4.3.6 window join
兩個window之間可以進行join,join操作只支持三種類型的window:滾動窗口,滑動窗口,會話窗口
使用方式:
java
stream.join(otherStream) //兩個流進行關聯
.where() //選擇第一個流的key作為關聯字段
.equalTo()//選擇第二個流的key作為關聯字段
.window()//設置窗口的類型
.apply() //對結果做操作
Tumbling Window Join
java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
…
DataStream orangeStream = …
DataStream greenStream = …
orangeStream.join(greenStream)
.where()
.equalTo()
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + “,” + second;
}
});
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-jVWbwv40-1638891999646)(assets/tumbling-window-join.svg)]
Sliding Window Join
java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
…
DataStream orangeStream = …
DataStream greenStream = …
orangeStream.join(greenStream)
.where()
.equalTo()
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size /, Time.milliseconds(1) / slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + “,” + second;
}
});
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5p8AWQNq-1638891999646)(assets/sliding-window-join.svg)]
Session Window Join
java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
…
DataStream orangeStream = …
DataStream greenStream = …
orangeStream.join(greenStream)
.where()
.equalTo()
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + “,” + second;
}
});
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-c8aeRAzr-1638891999647)(assets/session-window-join.svg)]
Interval Join
java
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
…
DataStream orangeStream = …
DataStream greenStream = …
orangeStream
.keyBy()
.intervalJoin(greenStream.keyBy())
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-oXiXtmiy-1638891999647)(assets/interval-join.svg)]
五 、招聘要求介紹(5分鐘)
六 、總結(5分鐘)
深入淺出Flink-task
一 、課前準備
二 、課堂主題
了解TaskManager,slot,Task之間的關系
三 、課程目標
了解TaskManager,slot,Task之間的關系
四 、知識要點
4.1 flink基礎知識
4.1.1 Flink基本架構
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-JmWfMyc6-1638892018928)(assets/Flink架構.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-2JFAe959-1638892018929)(assets/yarn.png)]
4.1.2 概述
Flink 整個系統主要由兩個組件組成,分別為 JobManager 和 TaskManager,Flink 架構也遵循 Master - Slave 架構設計原則,JobManager 為 Master 節點,TaskManager 為 Worker (Slave)節點。
所有組件之間的通信都是借助于 Akka Framework,包括任務的狀態以及 Checkpoint 觸發等信息。
4.1.3 Client 客戶端
客戶端負責將任務提交到集群,與 JobManager 構建 Akka 連接,然后將任務提交到 JobManager,通過和 JobManager 之間進行交互獲取任務執行狀態。
客戶端提交任務可以采用 CLI 方式或者通過使用 Flink WebUI 提交,也可以在應用程序中指定 JobManager 的 RPC 網絡端口構建 ExecutionEnvironment 提交 Flink 應用。
4.1.4 JobManager
JobManager 負責整個 Flink 集群任務的調度以及資源的管理,從客戶端中獲取提交的應用,然后根據集群中 TaskManager 上 TaskSlot 的使用情況,為提交的應用分配相應的 TaskSlot 資源并命令 TaskManager 啟動從客戶端中獲取的應用。
JobManager 相當于整個集群的 Master 節點,且整個集群有且只有一個活躍的 JobManager ,負責整個集群的任務管理和資源管理。
JobManager 和 TaskManager 之間通過 Actor System 進行通信,獲取任務執行的情況并通過 Actor System 將應用的任務執行情況發送給客戶端。
同時在任務執行的過程中,Flink JobManager 會觸發 Checkpoint 操作,每個 TaskManager 節點 收到 Checkpoint 觸發指令后,完成 Checkpoint 操作,所有的 Checkpoint 協調過程都是在 Fink JobManager 中完成。
當任務完成后,Flink 會將任務執行的信息反饋給客戶端,并且釋放掉 TaskManager 中的資源以供下一次提交任務使用。
4.1.5 TaskManager
TaskManager 相當于整個集群的 Slave 節點,負責具體的任務執行和對應任務在每個節點上的資源申請和管理。
客戶端通過將編寫好的 Flink 應用編譯打包,提交到 JobManager,然后 JobManager 會根據已注冊在 JobManager 中 TaskManager 的資源情況,將任務分配給有資源的 TaskManager節點,然后啟動并運行任務。
TaskManager 從 JobManager 接收需要部署的任務,然后使用 Slot 資源啟動 Task,建立數據接入的網絡連接,接收數據并開始數據處理。同時 TaskManager 之間的數據交互都是通過數據流的方式進行的。
可以看出,Flink 的任務運行其實是采用多線程的方式,這和 MapReduce 多 JVM 進行的方式有很大的區別,Flink 能夠極大提高 CPU 使用效率,在多個任務和 Task 之間通過 TaskSlot 方式共享系統資源,每個 TaskManager 中通過管理多個 TaskSlot 資源池進行對資源進行有效管理。
4.2 TaskManager 與 Slot
4.2.1 Slot
Flink的每個TaskManager為集群提供solt。 solt的數量通常與每個TaskManager節點的可用CPU內核數成比例。一般情況下你的slot數是你每個節點的cpu的核數。
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-FAbuFPkJ-1638892018930)(assets/1574472572814.png)]
4.2.2 并行度
一個Flink程序由多個任務組成(source、transformation和 sink)。 一個任務由多個并行的實例(線程)來執行, 一個任務的并行實例(線程)數目就被稱為該任務的并行度。
4.2.3 并行度的設置
一個任務的并行度設置可以從多個層次指定
?Operator Level(算子層次)
?Execution Environment Level(執行環境層次)
?Client Level(客戶端層次)
?System Level(系統層次)
算子層次
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-dNnqxNMx-1638892018930)(assets/1574472860477.png)]
執行環境層次
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Wf9ebwHm-1638892018930)(assets/1574472880358.png)]
客戶端層次
并行度可以在客戶端將job提交到Flink時設定,對于CLI客戶端,可以通過-p參數指定并行度
java
./bin/flink run -p 10 WordCount.jar
系統層次
在系統級可以通過設置flink-conf.yaml文件中的parallelism.default屬性來指定所有執行環境的默認并行度
4.2.4 案例演示
并行度為1
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-bE3VWtKu-1638892018930)(assets/1574473161843.png)]
各種不同的并行度
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-LCXV7bwc-1638892018931)(assets/1574473199938.png)]
4.3 任務提交
4.3.1 把任務提交到yarn上
java
演示一:
flink run -m yarn-cluster -p 2 -yn 2 -yjm 1024 -ytm 1024 -c streaming.slot.lesson01.WordCount flinklesson-1.0-SNAPSHOT.jar
演示二:
flink run -m yarn-cluster -p 3 -yn 2 -yjm 1024 -ytm 1024 -c streaming.slot.lesson01.WordCount flinklesson-1.0-SNAPSHOT.jar
4.3.2 把任務提交到standalone集群
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-hsekYYlN-1638892018931)(assets/1574475633598.png)]
java
演示一:
flink run -c streaming.slot.lesson01.WordCount -p 2 flinklesson-1.0-SNAPSHOT.jar
演示二:
flink run -c streaming.slot.lesson01.WordCount -p 3 flinklesson-1.0-SNAPSHOT.jar
4.4 task
4.4.1 數據傳輸的方式
forward strategy
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ZhFyOrJE-1638892018931)(assets/1574478499283.png)]
key based strategy
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-IU21DFJr-1638892018932)(assets/1574478613578.png)]
數據需要按照某個屬性(我們稱為 key)進行分組(或者說分區)
相同 key 的數據需要傳輸給同一個 task,在一個 task 中進行處理
broadcast strategy
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-rtA7ZZR5-1638892018932)(assets/1574478760221.png)]
random strategy
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EWdVIobK-1638892018932)(assets/1574478869374.png)]
4.4.2 Operator Chain
代碼:
java
public class WordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String topic=“testSlot”;
Properties consumerProperties = new Properties();
consumerProperties.setProperty(“bootstrap.servers”,“192.168.167.254:9092”);
consumerProperties.setProperty(“group.id”,“testSlot_consumer”);
}
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-CCFxCtzJ-1638892018933)(assets/1574479305528.png)]****
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-JmMkpdAT-1638892018933)(assets/1574479086824.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-PTRtnjOB-1638892018934)(assets/1574479147372.png)]
Operator Chain的條件:
并行度設置為1:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EhGPsWoW-1638892018934)(assets/1574480303336.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-QVNAEQJ9-1638892018934)(assets/1574480321456.png)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5nYVQkPK-1638892018934)(assets/1574480283016.png)]
4.1.1 需求背景
針對算法產生的日志數據進行清洗拆分
?1:算法產生的日志數據是嵌套json格式,需要拆分打平
?2:針對算法中的國家字段進行大區轉換
?3:把數據回寫到Kafka
4.1.2 項目架構
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-YxmZsyh8-1638892042235)(assets/項目架構-1573899524477.png)]
視頻網站(抖音),生成日志的時候,他們日志里面是把多條數據合并成一條數據了。
4.1.3 方案設計
日志格式:
java
直播平臺(是不是國內,但是類似于國內的抖音)
處理前:
{“dt”:“2019-11-19 20:33:39”,“countryCode”:“TW”,“data”:[{“type”:“s1”,“score”:0.8,“level”:“D”},{“type”:“s2”,“score”:0.1,“level”:“B”}]}
kafka:
如何去評估存儲,10億,評估每條數據多大,50k-幾百k
我們公司里面還有幾百個topic,數據都是這樣的一個情況,所以我們有很多的實時任務都是進行ETL
處理后:
“dt”:“2019-11-19 20:33:39”,“countryCode”:“TW”,“type”:“s1”,“score”:0.8,“level”:“D”
“dt”:“2019-11-19 20:33:39”,“countryCode”:“TW”,“type”:“s2”,“score”:0.1,“level”:“B”
其實是需要我們處理成:
“dt”:“2019-11-19 20:33:39”,“area”:“AREA_CT”,“type”:“s1”,“score”:0.8,“level”:“D”
“dt”:“2019-11-19 20:33:39”,“area”:“AREA_CT”,“type”:“s2”,“score”:0.1,“level”:“B”
我們日志里面有地區,地區用的是編號,需要我們做ETL的時候順帶也要轉化一下。
如果用SparkStrimming怎么做?
1.讀取redis里面的數據,作為一個廣播變量
2.讀區Kafka里面的日志數據
flatMap,把廣播變量傳進去。
如果是用flink又怎么做?
hset areas AREA_US US
hset areas AREA_CT TW,HK
hset areas AREA_AR PK,KW,SA
hset areas AREA_IN IN
flink -> reids -> k,v HashMap
US,AREA_US
TW,AREA_CT
HK,AREA_CT
IN,AREA_IN
{“dt”:“2019-11-19 20:33:41”,“countryCode”:“KW”,“data”:[{“type”:“s2”,“score”:0.2,“level”:“A”},{“type”:“s1”,“score”:0.2,“level”:“D”}]}
{“dt”:“2019-11-19 20:33:43”,“countryCode”:“HK”,“data”:[{“type”:“s5”,“score”:0.5,“level”:“C”},{“type”:“s2”,“score”:0.8,“level”:“B”}]}
reids碼表格式(元數據):
java
大區 國家
hset areas AREA_US US
hset areas AREA_CT TW,HK
hset areas AREA_AR PK,KW,SA
hset areas AREA_IN IN
操作:
java
HKEYS areas
HGETALL areas
4.2 實時報表
4.2.1 需求背景
主要針對直播/短視頻平臺審核指標的統計
?1:統計不同大區每1 min內過審(上架)視頻的數據量(單詞的個數)
? 分析一下:
? 統計的是大區,不同的大區,大區應該就是一個分組的字段,每分鐘(時間)的有效視頻(Process時間,事件的事件?)
?
每分鐘【1:事件時間 2:加上水位,這樣的話,我們可以挽救一些數據。3:收集數據延遲過多的數據】的不同大區的【有效視頻】的數量(單詞計數)
PM:產品經理
?2:統計不同大區每1 min內未過審(下架)的數據量
我們公司的是一個電商的平臺(京東,淘寶)
京東 -》 店主 -〉 上架商品 -》 通過審核了,可以上架了,有效商品數
每分鐘的不同主題的有效商品數。
【衣服】
【鞋】
【書】
【電子產品】
淘寶 -》 店主 -〉 上架商品 -》 未通過審核,下架 -〉 無效的商品數
4.2.2 項目架構
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-xMJyYLig-1638892042236)(assets/實時報表項目架構.png)]
4.2.3 方案設計
日志格式:
如果下一條數據就代表一個有效視頻。
統計的過去的一分鐘的每個大區的有效視頻數量
統計的過去的一分鐘的每個大區的,不同類型的有效視頻數量
統計的過去一分鐘是每個單詞出現次數。
java
{“dt”:“2019-11-20 15:09:43”,“type”:“child_unshelf”,“username”:“shenhe5”,“area”:“AREA_ID”}
{“dt”:“2019-11-20 15:09:44”,“type”:“chlid_shelf”,“username”:“shenhe2”,“area”:“AREA_ID”}
{“dt”:“2019-11-20 15:09:45”,“type”:“black”,“username”:“shenhe2”,“area”:“AREA_US”}
{“dt”:“2019-11-20 15:09:46”,“type”:“chlid_shelf”,“username”:“shenhe3”,“area”:“AREA_US”}
{“dt”:“2019-11-20 15:09:47”,“type”:“unshelf”,“username”:“shenhe3”,“area”:“AREA_ID”}
{“dt”:“2019-11-20 15:09:48”,“type”:“black”,“username”:“shenhe4”,“area”:“AREA_IN”}
pom文件:
java
<flink.version>1.9.0</flink.version>
<scala.version>2.11.8</scala.version>
-
-
kkbPro
com.kkb.flink
1.0-SNAPSHOT
4.0.0
ETL
-
-
org.apache.flink
flink-java
-
org.apache.flink
flink-streaming-java_2.11
-
org.apache.flink
flink-scala_2.11
-
org.apache.flink
flink-streaming-scala_2.11
-
org.apache.bahir
flink-connector-redis_2.11
-
org.apache.flink
flink-statebackend-rocksdb_2.11
-
org.apache.flink
flink-connector-kafka-0.11_2.11
-
org.apache.kafka
kafka-clients
-
org.slf4j
slf4j-api
-
org.slf4j
slf4j-log4j12
-
redis.clients
jedis
-
com.alibaba
fastjson
package com.kkb.core;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.kkb.source.KkbRedisSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.util.HashMap;
import java.util.Properties;
/**
-
數據清洗
//第一步:從Kafka里面讀取數據 消費者 數據源需要kafka//topic的取名還是有講究的,最好就是讓人根據這個名字就能知道里面有什么數據。//xxxx_xxx_xxx_xxxString topic="allData";Properties consumerProperties = new Properties();consumerProperties.put("bootstrap.servers","192.168.167.254:9092");consumerProperties.put("group.id","allTopic_consumer");/*** String topic, 主題* KafkaDeserializationSchema<T> deserializer,* Properties props*/FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic,new SimpleStringSchema(),consumerProperties);//{"dt":"2019-11-24 19:54:23","countryCode":"PK","data":[{"type":"s4","score":0.8,"level":"C"},{"type":"s5","score":0.2,"level":"C"}]}DataStreamSource<String> allData = env.addSource(consumer);//設置為廣播變量DataStream<HashMap<String, String>> mapData = env.addSource(new KkbRedisSource()).broadcast();SingleOutputStreamOperator<String> etlData = allData.connect(mapData).flatMap(new CoFlatMapFunction<String, HashMap<String, String>, String>() {HashMap<String, String> allMap = new HashMap<String, String>();//里面處理的是kafka的數據@Overridepublic void flatMap1(String line, Collector<String> out) throws Exception {JSONObject jsonObject = JSONObject.parseObject(line);String dt = jsonObject.getString("dt");String countryCode = jsonObject.getString("countryCode");//可以根據countryCode獲取大區的名字String area = allMap.get(countryCode);JSONArray data = jsonObject.getJSONArray("data");for (int i = 0; i < data.size(); i++) {JSONObject dataObject = data.getJSONObject(i);System.out.println("大區:"+area);dataObject.put("dt", dt);dataObject.put("area", area);//下游獲取到數據的時候,也就是一個json格式的數據out.collect(dataObject.toJSONString());}}//里面處理的是redis里面的數據@Overridepublic void flatMap2(HashMap<String, String> map,Collector<String> collector) throws Exception {System.out.println(map.toString());allMap = map;}});//ETL -> load kafkaetlData.print().setParallelism(1);/*** String topicId,* SerializationSchema<IN> serializationSchema,* Properties producerConfig)*/
*/
public class DataClean {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//我們是從Kafka里面讀取數據,所以這兒就是topic有多少個partition,那么就設置幾個并行度。
env.setParallelism(3);
env.enableCheckpointing(60000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//注釋,我們這兒其實需要設置state backed類型,我們要把checkpoint的數據存儲到
//rocksdb里面
// String outputTopic=“allDataClean”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
//
// //搞一個Kafka的生產者
// etlData.addSink(producer);
}
package com.kkb.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
/**
-
模擬數據源
*/
public class kafkaProducer {public static void main(String[] args) throws Exception{
//創建producer鏈接KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);//{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}while(true){String message = "{\"dt\":\""+getCurrentTime()+"\",\"countryCode\":\""+getCountryCode()+"\",\"data\":[{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"},{\"type\":\""+getRandomType()+"\",\"score\":"+getRandomScore()+",\"level\":\""+getRandomLevel()+"\"}]}";System.out.println(message);//同步的方式,往Kafka里面生產數據producer.send(new ProducerRecord<String, String>(topic,message));Thread.sleep(2000);}//關閉鏈接//producer.close();
Properties prop = new Properties();
//指定kafka broker地址
prop.put(“bootstrap.servers”, “192.168.167.254:9092”);
//指定key value的序列化方式
prop.put(“key.serializer”, StringSerializer.class.getName());
prop.put(“value.serializer”, StringSerializer.class.getName());
//指定topic名稱
String topic = “allData”;}
public static String getCurrentTime(){
SimpleDateFormat sdf = new SimpleDateFormat(“YYYY-MM-dd HH:mm:ss”);
return sdf.format(new Date());
}public static String getCountryCode(){
String[] types = {“US”,“TW”,“HK”,“PK”,“KW”,“SA”,“IN”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}public static String getRandomType(){
String[] types = {“s1”,“s2”,“s3”,“s4”,“s5”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}public static double getRandomScore(){
double[] types = {0.3,0.2,0.1,0.5,0.8};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}public static String getRandomLevel(){
String[] types = {“A”,“A+”,“B”,“C”,“D”};
Random random = new Random();
int i = random.nextInt(types.length);
return types[i];
}
}
?
package com.kkb.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.HashMap;
import java.util.Map;
/**
*
- hset areas AREA_US US
- hset areas AREA_CT TW,HK
- hset areas AREA_AR PK,KW,SA
- hset areas AREA_IN IN
- IN,AREA_IN
*/
public class KkbRedisSource implements SourceFunction<HashMap<String,String>> {
}
<?xml version="1.0" encoding="UTF-8"?>-
-
kkbPro
com.kkb.flink
1.0-SNAPSHOT
4.0.0
Report
-
-
org.apache.flink
flink-java
-
org.apache.flink
flink-streaming-java_2.11
-
org.apache.flink
flink-scala_2.11
-
org.apache.flink
flink-streaming-scala_2.11
-
org.apache.bahir
flink-connector-redis_2.11
-
org.apache.flink
flink-statebackend-rocksdb_2.11
-
org.apache.flink
flink-connector-kafka-0.11_2.11
-
org.apache.kafka
kafka-clients
-
org.slf4j
slf4j-api
-
org.slf4j
slf4j-log4j12
-
redis.clients
jedis
-
com.alibaba
fastjson
package com.kkb.core;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.kkb.function.MySumFuction;
import com.kkb.watermark.MyWaterMark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
/**
-
ETL:對數據進行預處理
-
報表:就是要計算一些指標
*/
public class DataReport {public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);env.enableCheckpointing(60000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//env.setStateBackend(new RocksDBStateBackend(""));//設置timeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);String topic="auditLog";Properties consumerProperties = new Properties();consumerProperties.put("bootstrap.servers","192.168.167.254:9092");consumerProperties.put("group.id","auditLog_consumer");//讀取Kafka里面對數據//{"dt":"2019-11-24 21:19:47","type":"child_unshelf","username":"shenhe1","area":"AREA_ID"}FlinkKafkaConsumer011<String> consumer =new FlinkKafkaConsumer011<String>(topic,new SimpleStringSchema(),consumerProperties);DataStreamSource<String> data = env.addSource(consumer);Logger logger= LoggerFactory.getLogger(DataReport.class);//對數據進行處理SingleOutputStreamOperator<Tuple3<Long, String, String>> preData = data.map(new MapFunction<String, Tuple3<Long, String, String>>() {/*** Long:time* String: type* String: area* @return* @throws Exception*/@Overridepublic Tuple3<Long, String, String> map(String line) throws Exception {JSONObject jsonObject = JSON.parseObject(line);String dt = jsonObject.getString("dt");String type = jsonObject.getString("type");String area = jsonObject.getString("area");long time = 0;try {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");time = sdf.parse(dt).getTime();} catch (ParseException e) {logger.error("時間解析失敗,dt:" + dt, e.getCause());}return Tuple3.of(time, type, area);}});/***過濾無效的數據*/SingleOutputStreamOperator<Tuple3<Long, String, String>> filterData = preData.filter(tuple3 -> tuple3.f0 != 0);/*** 收集遲到太久的數據*/OutputTag<Tuple3<Long,String,String>> outputTag=new OutputTag<Tuple3<Long,String,String>>("late-date"){};/*** 進行窗口的統計操作* 統計的過去的一分鐘的每個大區的,不同類型的有效視頻數量*/SingleOutputStreamOperator<Tuple4<String, String, String, Long>> resultData = filterData.assignTimestampsAndWatermarks(new MyWaterMark()).keyBy(1, 2).window(TumblingEventTimeWindows.of(Time.seconds(30))).sideOutputLateData(outputTag).apply(new MySumFuction());/*** 收集到延遲太多的數據,業務里面要求寫到Kafka*/SingleOutputStreamOperator<String> sideOutput =//java8resultData.getSideOutput(outputTag).map(line -> line.toString());
// String outputTopic=“lateData”;
// Properties producerProperties = new Properties();
// producerProperties.put(“bootstrap.servers”,“192.168.167.254:9092”);
// FlinkKafkaProducer011 producer = new FlinkKafkaProducer011<>(outputTopic,
// new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),
// producerProperties);
// sideOutput.addSink(producer);
}
package com.kkb.function;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
- IN,輸入的數據類型
- OUT,輸出的數據類型
- KEY,在flink里面這兒其實就是分組的字段,大家永遠看到的是一個tuple字段
- 只不過,如果你的分組的字段是有一個,那么這個tuple里面就只會有一個字段
- 如果說你的分組的字段有多個,那么這個里面就會有多個字段。
- W extends Window
*/
public class MySumFuction implements WindowFunction<Tuple3<Long,String,String>,
Tuple4<String,String,String,Long>,Tuple,TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow,
Iterable<Tuple3<Long, String, String>> input,
Collector<Tuple4<String, String, String, Long>> out) {
//獲取分組字段信息
String type = tuple.getField(0).toString();
String area = tuple.getField(1).toString();
}
package com.kkb.source;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
/**
*
*/
public class ProducerDataReport {
}
package com.kkb.watermark;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
/**
*
*/
public class MyWaterMark
implements AssignerWithPeriodicWatermarks<Tuple3<Long,String,String>> {
}
<?xml version="1.0" encoding="UTF-8"?>-
4.0.0
com.kkb.test
flinklesson
1.0-SNAPSHOT
-
<flink.version>1.9.0</flink.version>
<scala.version>2.11.8</scala.version>
-
-
org.apache.flink
flink-streaming-java_2.11
${flink.version}
-
org.apache.flink
flink-runtime-web_2.11
${flink.version}
-
org.apache.flink
flink-streaming-scala_2.11
${flink.version}
-
org.apache.flink
flink-statebackend-rocksdb_2.11
${flink.version}
-
joda-time
joda-time
2.7
-
org.apache.bahir
flink-connector-redis_2.11
1.0
-
org.apache.flink
flink-connector-kafka-0.11_2.11
${flink.version}
-
-
-
org.apache.maven.plugins
maven-compiler-plugin
3.1
-
1.81.8
-
/src/test/**
utf-8
-
net.alchim31.maven
scala-maven-plugin
3.2.0
-
-
compile-scala
compile
-
add-source
compile
-
test-compile-scala
test-compile
-
add-source
testCompile
-
${scala.version}
-
maven-assembly-plugin
-
-
jar-with-dependencies
-
-
make-assembly
package
-
single
package streaming.sink;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
-
把數據寫入redis
//創建redissinkRedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());l_wordsData.addSink(redisSink);env.execute("StreamingDemoToRedis");
*/
public class SinkForRedisDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource text = env.socketTextStream(“192.168.167.254”, 8888, “\n”);
//lpsuh l_words word
//對數據進行組裝,把string轉化為tuple2<String,String>
DataStream<Tuple2<String, String>> l_wordsData = text.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) throws Exception {
return new Tuple2<>(“b”, value);
}
});
//創建redis的配置
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“192.168.167.254”).setPort(6379).build();}
public static class MyRedisMapper implements RedisMapper<Tuple2<String, String>> {
@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.LPUSH);}
//表示從接收的數據中獲取需要操作的redis key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//表示從接收的數據中獲取需要操作的redis value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}}
}
package com.atguigu.apitest.sinktest
import java.util
import com.atguigu.apitest.SensorReading
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requests
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest.sinktest
-
Version: 1.0
-
Created by wushengran on 2019/9/17 16:27
*/
object EsSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)// source
val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)val httpHosts = new util.ArrayListHttpHost
httpHosts.add(new HttpHost(“localhost”, 9200))// 創建一個esSink 的builder
// 創建index request,準備發送數據val indexRequest = Requests.indexRequest().index("sensor").`type`("readingdata").source(json)// 利用index發送請求,寫入數據indexer.add(indexRequest)println("data saved.") }
val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading](
httpHosts,
new ElasticsearchSinkFunction[SensorReading] {
override def process(element: SensorReading, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
println("saving data: " + element)
// 包裝成一個Map或者JsonObject
val json = new util.HashMapString, String
json.put(“sensor_id”, element.id)
json.put(“temperature”, element.temperature.toString)
json.put(“ts”, element.timestamp.toString)}
)// sink
dataStream.addSink( esSinkBuilder.build() )env.execute(“es sink test”)
}
}
package com.atguigu.apitest.sinktest
import java.sql.{Connection, DriverManager, PreparedStatement}
import com.atguigu.apitest.SensorReading
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest.sinktest
-
Version: 1.0
-
Created by wushengran on 2019/9/17 16:44
*/
object JdbcSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)// source
val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)// sink
dataStream.addSink( new MyJdbcSink() )env.execute(“jdbc sink test”)
}
}
class MyJdbcSink() extends RichSinkFunction[SensorReading]{
// 定義sql連接、預編譯器
var conn: Connection = _
var insertStmt: PreparedStatement = _
var updateStmt: PreparedStatement = _
// 初始化,創建連接和預編譯語句
override def open(parameters: Configuration): Unit = {
super.open(parameters)
conn = DriverManager.getConnection(“jdbc:mysql://localhost:3306/test”, “root”, “123456”)
insertStmt = conn.prepareStatement(“INSERT INTO temperatures (sensor, temp) VALUES (?,?)”)
updateStmt = conn.prepareStatement(“UPDATE temperatures SET temp = ? WHERE sensor = ?”)
}
// 調用連接,執行sql
override def invoke(value: SensorReading, context: SinkFunction.Context[_]): Unit = {
// 執行更新語句
updateStmt.setDouble(1, value.temperature)
updateStmt.setString(2, value.id)
updateStmt.execute()
// 如果update沒有查到數據,那么執行插入語句
if( updateStmt.getUpdateCount == 0 ){
insertStmt.setString(1, value.id)
insertStmt.setDouble(2, value.temperature)
insertStmt.execute()
}
}
// 關閉時做清理工作
override def close(): Unit = {
insertStmt.close()
updateStmt.close()
conn.close()
}
}
package com.atguigu.apitest.sinktest
import java.util.Properties
import com.atguigu.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer011, FlinkKafkaProducer011}
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest.sinktest
-
Version: 1.0
-
Created by wushengran on 2019/9/17 15:43
*/
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)// source
// val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)
val properties = new Properties()
properties.setProperty(“bootstrap.servers”, “localhost:9092”)
properties.setProperty(“group.id”, “consumer-group”)
properties.setProperty(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
properties.setProperty(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”)
properties.setProperty(“auto.offset.reset”, “latest”)val inputStream = env.addSource(new FlinkKafkaConsumer011[String](“sensor”, new SimpleStringSchema(), properties))
// Transform操作
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble ).toString // 轉成String方便序列化輸出
}
)// sink
dataStream.addSink( new FlinkKafkaProducer011[String]( “sinkTest”, new SimpleStringSchema(), properties) )
dataStream.print()env.execute(“kafka sink test”)
}
}
package com.atguigu.apitest.sinktest
import com.atguigu.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest.sinktest
-
Version: 1.0
-
Created by wushengran on 2019/9/17 16:12
*/
object RedisSinkTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)// source
val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)// transform
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)val conf = new FlinkJedisPoolConfig.Builder()
.setHost(“localhost”)
.setPort(6379)
.build()// sink
dataStream.addSink( new RedisSink(conf, new MyRedisMapper()) )env.execute(“redis sink test”)
}
}
class MyRedisMapper() extends RedisMapper[SensorReading]{
// 定義保存數據到redis的命令
override def getCommandDescription: RedisCommandDescription = {
// 把傳感器id和溫度值保存成哈希表 HSET key field value
new RedisCommandDescription( RedisCommand.HSET, “sensor_temperature” )
}
// 定義保存到redis的value
override def getValueFromData(t: SensorReading): String = t.temperature.toString
// 定義保存到redis的key
override def getKeyFromData(t: SensorReading): String = t.id
}
package com.atguigu.apitest
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest
-
Version: 1.0
-
Created by wushengran on 2019/8/24 10:14
*/
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.enableCheckpointing(60000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
env.getCheckpointConfig.setCheckpointTimeout(100000)
env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
// env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(100)
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)env.setRestartStrategy(RestartStrategies.failureRateRestart(3, org.apache.flink.api.common.time.Time.seconds(300), org.apache.flink.api.common.time.Time.seconds(10)))
// env.setStateBackend( new RocksDBStateBackend("") )
val stream = env.socketTextStream("localhost", 7777)val dataStream = stream.map(data => {val dataArray = data.split(",")SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble) }).assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading]( Time.seconds(1) ) {override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000 } )val processedStream = dataStream.keyBy(_.id).process( new TempIncreAlert() )val processedStream2 = dataStream.keyBy(_.id)// .process( new TempChangeAlert(10.0) )
.flatMap( new TempChangeAlert(10.0) )
}
}
class TempIncreAlert() extends KeyedProcessFunction[String, SensorReading, String]{
// 定義一個狀態,用來保存上一個數據的溫度值
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )
// 定義一個狀態,用來保存定時器的時間戳
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“currentTimer”, classOf[Long]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = {
// 先取出上一個溫度值
val preTemp = lastTemp.value()
// 更新溫度值
lastTemp.update( value.temperature )
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = {
// 輸出報警信息
out.collect( ctx.getCurrentKey + " 溫度連續上升" )
currentTimer.clear()
}
}
class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)]{
private var lastTempState: ValueState[Double] = _
override def open(parameters: Configuration): Unit = {
// 初始化的時候聲明state變量
lastTempState = getRuntimeContext.getState(new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]))
}
override def flatMap(value: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
// 獲取上次的溫度值
val lastTemp = lastTempState.value()
// 用當前的溫度值和上次的求差,如果大于閾值,輸出報警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
class TempChangeAlert2(threshold: Double) extends KeyedProcessFunction[String, SensorReading, (String, Double, Double)]{
// 定義一個狀態變量,保存上次的溫度值
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double](“lastTemp”, classOf[Double]) )
override def processElement(value: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, (String, Double, Double)]#Context, out: Collector[(String, Double, Double)]): Unit = {
// 獲取上次的溫度值
val lastTemp = lastTempState.value()
// 用當前的溫度值和上次的求差,如果大于閾值,輸出報警信息
val diff = (value.temperature - lastTemp).abs
if(diff > threshold){
out.collect( (value.id, lastTemp, value.temperature) )
}
lastTempState.update(value.temperature)
}
}
package com.atguigu.apitest
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest
-
Version: 1.0
-
Created by wushengran on 2019/8/24 11:16
*/
object SideOutputTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val stream = env.socketTextStream(“localhost”, 7777)
val dataStream = stream.map(data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
})
.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractorSensorReading {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000
} )val processedStream = dataStream
.process( new FreezingAlert() )
// dataStream.print(“input data”)
processedStream.print(“processed data”)
processedStream.getSideOutput( new OutputTag[String](“freezing alert”) ).print(“alert data”)
}
}
// 冰點報警,如果小于32F,輸出報警信息到側輸出流
class FreezingAlert() extends ProcessFunction[SensorReading, SensorReading]{
// lazy val alertOutput: OutputTag[String] = new OutputTag[String]( “freezing alert” )
override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
if( value.temperature < 32.0 ){
ctx.output( new OutputTag[String]( “freezing alert” ), "freezing alert for " + value.id )
}
out.collect( value )
}
}
package com.atguigu.apitest
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import scala.util.Random
/**
*
*
- Project: FlinkTutorial
- Package: com.atguigu.apitest
- Version: 1.0
- Created by wushengran on 2019/9/17 10:11
*/
// 定義傳感器數據樣例類
case class SensorReading( id: String, timestamp: Long, temperature: Double )
object SourceTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// env.fromElements(“flink”, 1, 32, 3213, 0.324).print(“test”)
// 2. 從文件中讀取數據 val stream2 = env.readTextFile("D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt")// 3. 從kafka中讀取數據 // 創建kafka相關的配置 val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "consumer-group") properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") properties.setProperty("auto.offset.reset", "latest")val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))// 4. 自定義數據源 val stream4 = env.addSource(new SensorSource())// sink輸出 stream4.print("stream4")env.execute("source api test")}
}
class SensorSource() extends SourceFunction[SensorReading]{
// 定義一個flag:表示數據源是否還在正常運行
var running: Boolean = true
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
// 創建一個隨機數發生器
val rand = new Random()
}
}
package com.atguigu.apitest
import org.apache.flink.api.common.functions.{FilterFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest
-
Version: 1.0
-
Created by wushengran on 2019/9/17 11:41
*/
object TransformTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)// 讀入數據
val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)// Transform操作
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading( dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble )
}
)// 1. 聚合操作
val stream1 = dataStream
.keyBy(“id”)
// .sum(“temperature”)
.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature + 10) )// 2. 分流,根據溫度是否大于30度劃分
val splitStream = dataStream
.split( sensorData => {
if( sensorData.temperature > 30 ) Seq(“high”) else Seq(“low”)
} )val highTempStream = splitStream.select(“high”)
val lowTempStream = splitStream.select(“low”)
val allTempStream = splitStream.select(“high”, “low”)// 3. 合并兩條流
val warningStream = highTempStream.map( sensorData => (sensorData.id, sensorData.temperature) )
val connectedStreams = warningStream.connect(lowTempStream)val coMapStream = connectedStreams.map(
warningData => ( warningData._1, warningData._2, “high temperature warning” ),
lowData => ( lowData.id, “healthy” )
)val unionStream = highTempStream.union(lowTempStream)
// 函數類
dataStream.filter( new MyFilter() ).print()// 輸出數據
// dataStream.print()
// highTempStream.print(“high”)
// lowTempStream.print(“low”)
// allTempStream.print(“all”)
// unionStream.print(“union”)env.execute(“transform test job”)
}
}
class MyFilter() extends FilterFunction[SensorReading]{
override def filter(value: SensorReading): Boolean = {
value.id.startsWith(“sensor_1”)
}
}
class MyMapper() extends RichMapFunction[SensorReading, String]{
override def map(value: SensorReading): String = {
“flink”
}
override def open(parameters: Configuration): Unit = super.open(parameters)
}
package com.atguigu.apitest
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.apitest
-
Version: 1.0
-
Created by wushengran on 2019/9/18 9:31
*/
object WindowTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
// 設置事件時間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(500)// 讀入數據
// val inputStream = env.readTextFile(“D:\Projects\BigData\FlinkTutorial\src\main\resources\sensor.txt”)val inputStream = env.socketTextStream(“localhost”, 7777)
val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
// .assignAscendingTimestamps(.timestamp * 1000L)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
// .assignTimestampsAndWatermarks( new MyAssigner() )
.map(data => (data.id, data.temperature))
.keyBy(._1)
// .process( new MyProcess() )
.timeWindow(Time.seconds(10), Time.seconds(3))
.reduce((result, data) => (data._1, result._2.min(data._2))) // 統計10秒內的最低溫度值dataStream.print()
env.execute(“window api test”)
}
}
class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{
// 定義固定延遲為3秒
val bound: Long = 3 * 1000L
// 定義當前收到的最大的時間戳
var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
maxTs = maxTs.max(element.timestamp * 1000L)
element.timestamp * 1000L
}
}
class MyAssigner2() extends AssignerWithPunctuatedWatermarks[SensorReading]{
val bound: Long = 1000L
override def checkAndGetNextWatermark(lastElement: SensorReading, extractedTimestamp: Long): Watermark = {
if( lastElement.id == “sensor_1” ){
new Watermark(extractedTimestamp - bound)
}else{
null
}
}
override def extractTimestamp(element: SensorReading, previousElementTimestamp: Long): Long = {
element.timestamp * 1000L
}
}
package com.atguigu.wc
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
/**
*
*
-
Project: FlinkTutorial
-
Package: com.atguigu.wc
-
Version: 1.0
-
Created by wushengran on 2019/9/16 14:08
*/
object StreamWordCount {
def main(args: Array[String]): Unit = {val params = ParameterTool.fromArgs(args)
val host: String = params.get(“host”)
val port: Int = params.getInt(“port”)// 創建一個流處理的執行環境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
// env.disableOperatorChaining()// 接收socket數據流
val textDataStream = env.socketTextStream(host, port)// 逐一讀取數據,分詞之后進行wordcount
val wordCountDataStream = textDataStream.flatMap(.split("\s"))
.filter(.nonEmpty).startNewChain()
.map( (_, 1) )
.keyBy(0)
.sum(1)// 打印輸出
wordCountDataStream.print().setParallelism(1)// 執行任務
env.execute(“stream word count job”)
}
}
package com.atguigu.wc
import org.apache.flink.api.scala._
/**
*
*
- Project: FlinkTutorial
- Package: com.atguigu.wc
- Version: 1.0
- Created by wushengran on 2019/9/16 11:48
*/
// 批處理代碼
object WordCount {
def main(args: Array[String]): Unit = {
// 創建一個批處理的執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
}
}
-
4.0.0
com.atguigu
FlinkTutorial
1.0-SNAPSHOT
-
-
org.apache.flink
flink-scala_2.11
1.7.2
-
org.apache.flink
flink-streaming-scala_2.11
1.7.2
-
org.apache.flink
flink-connector-kafka-0.11_2.11
1.7.2
-
org.apache.bahir
flink-connector-redis_2.11
1.0
-
org.apache.flink
flink-connector-elasticsearch6_2.11
1.7.2
-
mysql
mysql-connector-java
5.1.44
-
org.apache.flink
flink-statebackend-rocksdb_2.11
1.7.2
-
-
-
net.alchim31.maven
scala-maven-plugin
3.4.6
-
-
-
testCompile
-
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
-
-
jar-with-dependencies
-
-
make-assembly
package
-
single
<?xml version="1.0" encoding="UTF-8"?>-
4.0.0
com.atguigu
UserBehaviorAnalysis
pom
1.0-SNAPSHOT
-
HotItemsAnalysis
NetworkFlowAnalysis
MarketAnalysis
LoginFailDetect
OrderPayDetect
-
<flink.version>1.7.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<kafka.version>2.2.0</kafka.version>
-
-
org.apache.flink
flink-scala_${scala.binary.version}
${flink.version}
-
org.apache.flink
flink-streaming-scala_${scala.binary.version}
${flink.version}
-
org.apache.kafka
kafka_${scala.binary.version}
${kafka.version}
-
org.apache.flink
flink-connector-kafka_${scala.binary.version}
${flink.version}
-
-
-
net.alchim31.maven
scala-maven-plugin
3.4.6
-
-
-
testCompile
-
org.apache.maven.plugins
maven-assembly-plugin
3.0.0
-
-
jar-with-dependencies
-
-
make-assembly
package
-
single
<?xml version="1.0" encoding="UTF-8"?>-
-
UserBehaviorAnalysis
com.atguigu
1.0-SNAPSHOT
4.0.0
OrderPayDetect
-
-
org.apache.flink
flink-cep-scala_${scala.binary.version}
${flink.version}
package com.atguigu.orderpay_detectimport java.util
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
/**
*
*
- Project: UserBehaviorAnalysis
- Package: com.atguigu.orderpay_detect
- Version: 1.0
- Created by wushengran on 2019/9/25 9:17
*/
// 定義輸入訂單事件的樣例類
case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)
// 定義輸出結果樣例類
case class OrderResult(orderId: Long, resultMsg: String)
object OrderTimeout {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
}
}
// 自定義超時事件序列處理函數
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult] {
override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
val timeoutOrderId = map.get(“begin”).iterator().next().orderId
OrderResult(timeoutOrderId, “timeout”)
}
}
// 自定義正常支付事件序列處理函數
class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult] {
override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
val payedOrderId = map.get(“follow”).iterator().next().orderId
OrderResult(payedOrderId, “payed successfully”)
}
}
package com.atguigu.orderpay_detect
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
*
*
- Project: UserBehaviorAnalysis
- Package: com.atguigu.orderpay_detect
- Version: 1.0
- Created by wushengran on 2019/9/25 10:27
*/
object OrderTimeoutWithoutCep {
val orderTimeoutOutputTag = new OutputTagOrderResult
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
// val timeoutWarningStream = orderEventStream.process( new OrderTimeoutWarning() )
val orderResultStream = orderEventStream.process( new OrderPayMatch() )
}
class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean](“ispayed-state”, classOf[Boolean]))
// 保存定時器的時間戳為狀態
lazy val timerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](“timer-state”, classOf[Long]))
}
}
// 實現自定義的處理函數
class OrderTimeoutWarning() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
// 保存pay是否來過的狀態
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean](“ispayed-state”, classOf[Boolean]))
override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {
// 先取出狀態標識位
val isPayed = isPayedState.value()
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
// 判斷isPayed是否為true
val isPayed = isPayedState.value()
if(isPayed){
out.collect( OrderResult( ctx.getCurrentKey, “order payed successfully” ) )
} else {
out.collect( OrderResult( ctx.getCurrentKey, “order timeout” ) )
}
// 清空狀態
isPayedState.clear()
}
}
package com.atguigu.orderpay_detect
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
*
*
- Project: UserBehaviorAnalysis
- Package: com.atguigu.orderpay_detect
- Version: 1.0
- Created by wushengran on 2019/9/25 14:15
*/
// 定義接收流事件的樣例類
case class ReceiptEvent(txId: String, payChannel: String, eventTime: Long)
object TxMacthDetect {
// 定義側數據流tag
val unmatchedPays = new OutputTagOrderEvent
val unmatchedReceipts = new OutputTagReceiptEvent
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// val orderEventStream = env.readTextFile(resource.getPath)
val orderEventStream = env.socketTextStream(“localhost”, 7777)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.filter(.txId != “”)
.assignAscendingTimestamps(.eventTime * 1000L)
.keyBy(_.txId)
// val receiptEventStream = env.readTextFile(receiptResource.getPath)
val receiptEventStream = env.socketTextStream(“localhost”, 8888)
.map( data => {
val dataArray = data.split(",")
ReceiptEvent( dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong )
} )
.assignAscendingTimestamps(.eventTime * 1000L)
.keyBy(.txId)
}
class TxPayMatch() extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
// 定義狀態來保存已經到達的訂單支付事件和到賬事件
lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent](“pay-state”, classOf[OrderEvent]))
lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent](“receipt-state”, classOf[ReceiptEvent]))
}
}
package com.atguigu.orderpay_detect
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
/**
*
*
-
Project: UserBehaviorAnalysis
-
Package: com.atguigu.orderpay_detect
-
Version: 1.0
-
Created by wushengran on 2019/9/25 15:40
*/
object TxMatchByJoin {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 讀取訂單事件流
val resource = getClass.getResource("/OrderLog.csv")
// val orderEventStream = env.readTextFile(resource.getPath)
val orderEventStream = env.socketTextStream(“localhost”, 7777)
.map(data => {
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)
})
.filter(.txId != “”)
.assignAscendingTimestamps(.eventTime * 1000L)
.keyBy(_.txId)// 讀取支付到賬事件流
val receiptResource = getClass.getResource("/ReceiptLog.csv")
// val receiptEventStream = env.readTextFile(receiptResource.getPath)
val receiptEventStream = env.socketTextStream(“localhost”, 8888)
.map( data => {
val dataArray = data.split(",")
ReceiptEvent( dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong )
} )
.assignAscendingTimestamps(.eventTime * 1000L)
.keyBy(.txId)// join處理
val processedStream = orderEventStream.intervalJoin( receiptEventStream )
.between(Time.seconds(-5), Time.seconds(5))
.process( new TxPayMatchByJoin() )processedStream.print()
env.execute(“tx pay match by join job”)
}
}
class TxPayMatchByJoin() extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]{
override def processElement(left: OrderEvent, right: ReceiptEvent, ctx: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
out.collect((left, right))
}
}
package com.atguigu.marketanalysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
*
*
- Project: UserBehaviorAnalysis
- Package: com.atguigu.marketanalysis
- Version: 1.0
- Created by wushengran on 2019/9/24 10:10
*/
// 輸入的廣告點擊事件樣例類
case class AdClickEvent( userId: Long, adId: Long, province: String, city: String, timestamp: Long )
// 按照省份統計的輸出結果樣例類
case class CountByProvince( windowEnd: String, province: String, count: Long )
// 輸出的黑名單報警信息
case class BlackListWarning( userId: Long, adId: Long, msg: String )
object AdStatisticsByGeo {
// 定義側輸出流的tag
val blackListOutputTag: OutputTag[BlackListWarning] = new OutputTagBlackListWarning
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
}
class FilterBlackListUser(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]{
// 定義狀態,保存當前用戶對當前廣告的點擊量
lazy val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long](“count-state”, classOf[Long]))
// 保存是否發送過黑名單的狀態
lazy val isSentBlackList: ValueState[Boolean] = getRuntimeContext.getState( new ValueStateDescriptor[Boolean](“issent-state”, classOf[Boolean]) )
// 保存定時器觸發的時間戳
lazy val resetTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long](“resettime-state”, classOf[Long]) )
}
}
// 自定義預聚合函數
class AdCountAgg() extends AggregateFunction[AdClickEvent, Long, Long]{
override def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
// 自定義窗口處理函數
class AdCountResult() extends WindowFunction[Long, CountByProvince, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[CountByProvince]): Unit = {
out.collect( CountByProvince( new Timestamp(window.getEnd).toString, key, input.iterator.next() ) )
}
}
package com.atguigu.marketanalysis
import java.sql.Timestamp
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
/**
*
*
-
Project: UserBehaviorAnalysis
-
Package: com.atguigu.marketanalysis
-
Version: 1.0
-
Created by wushengran on 2019/9/23 15:37
*/
object AppMarketing {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val dataStream = env.addSource( new SimulatedEventSource() )
.assignAscendingTimestamps(_.timestamp)
.filter( .behavior != “UNINSTALL” )
.map( data => {
( “dummyKey”, 1L )
} )
.keyBy(._1) // 以渠道和行為類型作為key分組
.timeWindow( Time.hours(1), Time.seconds(10) )
.aggregate( new CountAgg(), new MarketingCountTotal() )dataStream.print()
env.execute(“app marketing job”)
}
}
class CountAgg() extends AggregateFunction[(String, Long), Long, Long]{
override def add(value: (String, Long), accumulator: Long): Long = accumulator + 1
override def createAccumulator(): Long = 0L
override def getResult(accumulator: Long): Long = accumulator
override def merge(a: Long, b: Long): Long = a + b
}
class MarketingCountTotal() extends WindowFunction[Long, MarketingViewCount, String, TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(window.getStart).toString
val endTs = new Timestamp(window.getEnd).toString
val count = input.iterator.next()
out.collect( MarketingViewCount(startTs, endTs, “app marketing”, “total”, count) )
}
}
package com.atguigu.marketanalysis
import java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.util.Random
/**
*
*
- Project: UserBehaviorAnalysis
- Package: com.atguigu.marketanalysis
- Version: 1.0
- Created by wushengran on 2019/9/23 15:06
*/
// 輸入數據樣例類
case class MarketingUserBehavior( userId: String, behavior: String, channel: String, timestamp: Long )
// 輸出結果樣例類
case class MarketingViewCount( windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long )
object AppMarketingByChannel {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
}
}
// 自定義數據源
class SimulatedEventSource() extends RichSourceFunction[MarketingUserBehavior]{
// 定義是否運行的標識位
var running = true
// 定義用戶行為的集合
val behaviorTypes: Seq[String] = Seq(“CLICK”, “DOWNLOAD”, “INSTALL”, “UNINSTALL”)
// 定義渠道的集合
val channelSets: Seq[String] = Seq(“wechat”, “weibo”, “appstore”, “huaweistore”)
// 定義一個隨機數發生器
val rand: Random = new Random()
override def cancel(): Unit = running = false
override def run(ctx: SourceFunction.SourceContext[MarketingUserBehavior]): Unit = {
// 定義一個生成數據的上限
val maxElements = Long.MaxValue
var count = 0L
}
}
// 自定義處理函數
class MarketingCountByChannel() extends ProcessWindowFunction[((String, String), Long), MarketingViewCount, (String, String), TimeWindow]{
override def process(key: (String, String), context: Context, elements: Iterable[((String, String), Long)], out: Collector[MarketingViewCount]): Unit = {
val startTs = new Timestamp(context.window.getStart).toString
val endTs = new Timestamp(context.window.getEnd).toString
val channel = key._1
val behavior = key._2
val count = elements.size
out.collect( MarketingViewCount(startTs, endTs, channel, behavior, count) )
}
}
UserBehaviorAnalysis
com.atguigu
1.0-SNAPSHOT
4.0.0
MarketAnalysis
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
- 上一篇: 获取训练数据的方式
- 下一篇: 【转载】向量空间模型VSM及余弦计算