duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)
Flink總共有三種時(shí)間語義:Processing time(處理時(shí)間)、Event time(事件時(shí)間)以及Ingestion time(攝入時(shí)間)。關(guān)于這些時(shí)間語義的具體解釋,可以參考另一篇文章Flink的時(shí)間與watermarks詳解。本文主要講解Flink Table API & SQL中基于時(shí)間的算子如何定義時(shí)間語義。通過本文你可以了解到:
- 時(shí)間屬性的簡(jiǎn)介
- 處理時(shí)間
- 事件時(shí)間
時(shí)間屬性簡(jiǎn)介
Flink TableAPI&SQL中的基于時(shí)間的操作(如window),需要指定時(shí)間語義,表可以根據(jù)指定的時(shí)間戳提供一個(gè)邏輯時(shí)間屬性。
時(shí)間屬性是表schama的一部分,當(dāng)使用DDL創(chuàng)建表時(shí)、DataStream轉(zhuǎn)為表時(shí)或者使用TableSource時(shí),會(huì)定義時(shí)間屬性。一旦時(shí)間屬性被定義完成,該時(shí)間屬性可以看做是一個(gè)字段的引用,從而在基于時(shí)間的操作中使用該字段。
時(shí)間屬性像一個(gè)時(shí)間戳,可以被訪問并參與計(jì)算,如果一個(gè)時(shí)間屬性參與計(jì)算,那么該時(shí)間屬性會(huì)被霧化成一個(gè)常規(guī)的時(shí)間戳,常規(guī)的時(shí)間戳不能與Flink的時(shí)間與水位線兼容,不能被基于時(shí)間的操作所使用。
Flink TableAPI & SQL所需要的時(shí)間屬性可以通過Datastream程序中指定,如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默認(rèn)// 可以選擇: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);處理時(shí)間
基于本地的機(jī)器時(shí)間,是一種最簡(jiǎn)單的時(shí)間語義,但是不能保證結(jié)果一致性,使用該時(shí)間語義不需要提取時(shí)間戳和生成水位線。總共有三種方式定義處理時(shí)間屬性,具體如下
DDL語句創(chuàng)建表時(shí)定義處理時(shí)間
處理時(shí)間的屬性可以在DDL語句中被定義為一個(gè)計(jì)算列,需要使用PROCTIME()函數(shù),如下所示:
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 聲明一個(gè)額外字段,作為處理時(shí)間屬性 ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分鐘的滾動(dòng)窗口DataStream轉(zhuǎn)為Table的過程中定義處理時(shí)間
在將DataStream轉(zhuǎn)為表時(shí),在schema定義中可以通過.proctime屬性指定時(shí)間屬性,并將其放在其他schema字段的最后面,具體如下:
DataStream<Tuple2<String, String>> stream = ...; // 聲明一個(gè)額外邏輯字段作為處理時(shí)間屬性 Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));使用TableSource
自定義TableSource并實(shí)現(xiàn)DefinedProctimeAttribute 接口,如下:
// 定義個(gè)帶有處理時(shí)間屬性的table source public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 創(chuàng)建streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 該字段會(huì)追加到schema中,作為第三個(gè)字段return "user_action_time";} }// 注冊(cè)table source tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));事件時(shí)間
基于記錄的具體時(shí)間戳,即便是存在亂序或者遲到數(shù)據(jù)也會(huì)保證結(jié)果的一致性。總共有三種方式定義處理時(shí)間屬性,具體如下
DDL語句創(chuàng)建表時(shí)定事件時(shí)間
事件時(shí)間屬性可以通過 WATERMARK語句進(jìn)行定義,如下:
CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 聲明user_action_time作為事件時(shí)間屬性,并允許5S的延遲 WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);DataStream轉(zhuǎn)為Table的過程中定義事件時(shí)間
當(dāng)定義Schema時(shí)通過.rowtime屬性指定事件時(shí)間屬性,必須在DataStream中指定時(shí)間戳與水位線。例如在數(shù)據(jù)集中,事件時(shí)間屬性為event_time,此時(shí)Table中的事件時(shí)間字段中可以通過’event_time. rowtime‘來指定。
目前Flink支持兩種方式定義EventTime字段,如下:
// 方式1: // 提取timestamp并分配watermarks DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 聲明一個(gè)額外邏輯字段作為事件時(shí)間屬性 // 在table schema的末尾使用user_action_time.rowtime定義事件時(shí)間屬性 // 系統(tǒng)會(huì)在TableEnvironment中獲取事件時(shí)間屬性 Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");// 方式2:// 從第一個(gè)字段提取timestamp并分配watermarks DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 第一個(gè)字段已經(jīng)用來提取時(shí)間戳,可以直接使用對(duì)應(yīng)的字段作為事件時(shí)間屬性 Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");// 使用:WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));使用TableSource
另外也可以在創(chuàng)建TableSource的時(shí)候,實(shí)現(xiàn)DefinedRowtimeAttributes接口來定義EventTime字段,在接口中需要實(shí)現(xiàn)getRowtimeAttributeDescriptors方法,創(chuàng)建基于EventTime的時(shí)間屬性信息。
// 定義帶有rowtime屬性的table source public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};TypeInformation[] types =new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 創(chuàng)建流,基于user_action_time屬性分配水位線DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);return stream;}@Overridepublic List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {// 標(biāo)記user_action_time字段作為事件時(shí)間屬性// 創(chuàng)建user_action_time描述符,用來標(biāo)識(shí)時(shí)間屬性字段RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps());List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;} }// register表 tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));小結(jié)
本文主要介紹了如何在Flink Table API和SQL中使用時(shí)間語義,可以使用兩種時(shí)間語義:處理時(shí)間和事件時(shí)間。分別對(duì)每種的時(shí)間語義的使用方式進(jìn)行了詳細(xì)解釋。
往期精彩回顧
Flink Table API & SQL編程指南(1)
Flink Table API & SQL編程指南之動(dòng)態(tài)表(2)
總結(jié)
以上是生活随笔為你收集整理的duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++PrimerPlus学习——第九章
- 下一篇: oracle tsn 数据库,【Orac