日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)

發(fā)布時(shí)間:2025/3/11 数据库 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3) 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Flink總共有三種時(shí)間語義:Processing time(處理時(shí)間)、Event time(事件時(shí)間)以及Ingestion time(攝入時(shí)間)。關(guān)于這些時(shí)間語義的具體解釋,可以參考另一篇文章Flink的時(shí)間與watermarks詳解。本文主要講解Flink Table API & SQL中基于時(shí)間的算子如何定義時(shí)間語義。通過本文你可以了解到:

  • 時(shí)間屬性的簡(jiǎn)介
  • 處理時(shí)間
  • 事件時(shí)間

時(shí)間屬性簡(jiǎn)介

Flink TableAPI&SQL中的基于時(shí)間的操作(如window),需要指定時(shí)間語義,表可以根據(jù)指定的時(shí)間戳提供一個(gè)邏輯時(shí)間屬性。

時(shí)間屬性是表schama的一部分,當(dāng)使用DDL創(chuàng)建表時(shí)、DataStream轉(zhuǎn)為表時(shí)或者使用TableSource時(shí),會(huì)定義時(shí)間屬性。一旦時(shí)間屬性被定義完成,該時(shí)間屬性可以看做是一個(gè)字段的引用,從而在基于時(shí)間的操作中使用該字段。

時(shí)間屬性像一個(gè)時(shí)間戳,可以被訪問并參與計(jì)算,如果一個(gè)時(shí)間屬性參與計(jì)算,那么該時(shí)間屬性會(huì)被霧化成一個(gè)常規(guī)的時(shí)間戳,常規(guī)的時(shí)間戳不能與Flink的時(shí)間與水位線兼容,不能被基于時(shí)間的操作所使用。

Flink TableAPI & SQL所需要的時(shí)間屬性可以通過Datastream程序中指定,如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 默認(rèn)// 可以選擇: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

處理時(shí)間

基于本地的機(jī)器時(shí)間,是一種最簡(jiǎn)單的時(shí)間語義,但是不能保證結(jié)果一致性,使用該時(shí)間語義不需要提取時(shí)間戳和生成水位線。總共有三種方式定義處理時(shí)間屬性,具體如下

DDL語句創(chuàng)建表時(shí)定義處理時(shí)間

處理時(shí)間的屬性可以在DDL語句中被定義為一個(gè)計(jì)算列,需要使用PROCTIME()函數(shù),如下所示:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time AS PROCTIME() -- 聲明一個(gè)額外字段,作為處理時(shí)間屬性 ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE); -- 10分鐘的滾動(dòng)窗口

DataStream轉(zhuǎn)為Table的過程中定義處理時(shí)間

在將DataStream轉(zhuǎn)為表時(shí),在schema定義中可以通過.proctime屬性指定時(shí)間屬性,并將其放在其他schema字段的最后面,具體如下:

DataStream<Tuple2<String, String>> stream = ...; // 聲明一個(gè)額外邏輯字段作為處理時(shí)間屬性 Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.proctime");WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

自定義TableSource并實(shí)現(xiàn)DefinedProctimeAttribute 接口,如下:

// 定義個(gè)帶有處理時(shí)間屬性的table source public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name" , "data"};TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 創(chuàng)建streamDataStream<Row> stream = ...;return stream;}@Overridepublic String getProctimeAttribute() {// 該字段會(huì)追加到schema中,作為第三個(gè)字段return "user_action_time";} }// 注冊(cè)table source tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

事件時(shí)間

基于記錄的具體時(shí)間戳,即便是存在亂序或者遲到數(shù)據(jù)也會(huì)保證結(jié)果的一致性。總共有三種方式定義處理時(shí)間屬性,具體如下

DDL語句創(chuàng)建表時(shí)定事件時(shí)間

事件時(shí)間屬性可以通過 WATERMARK語句進(jìn)行定義,如下:

CREATE TABLE user_actions (user_name STRING,data STRING,user_action_time TIMESTAMP(3),-- 聲明user_action_time作為事件時(shí)間屬性,并允許5S的延遲 WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND ) WITH (... );SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name) FROM user_actions GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

DataStream轉(zhuǎn)為Table的過程中定義事件時(shí)間

當(dāng)定義Schema時(shí)通過.rowtime屬性指定事件時(shí)間屬性,必須在DataStream中指定時(shí)間戳與水位線。例如在數(shù)據(jù)集中,事件時(shí)間屬性為event_time,此時(shí)Table中的事件時(shí)間字段中可以通過’event_time. rowtime‘來指定。

目前Flink支持兩種方式定義EventTime字段,如下:

// 方式1: // 提取timestamp并分配watermarks DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 聲明一個(gè)額外邏輯字段作為事件時(shí)間屬性 // 在table schema的末尾使用user_action_time.rowtime定義事件時(shí)間屬性 // 系統(tǒng)會(huì)在TableEnvironment中獲取事件時(shí)間屬性 Table table = tEnv.fromDataStream(stream, "user_name, data, user_action_time.rowtime");// 方式2:// 從第一個(gè)字段提取timestamp并分配watermarks DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);// 第一個(gè)字段已經(jīng)用來提取時(shí)間戳,可以直接使用對(duì)應(yīng)的字段作為事件時(shí)間屬性 Table table = tEnv.fromDataStream(stream, "user_action_time.rowtime, user_name, data");// 使用:WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

使用TableSource

另外也可以在創(chuàng)建TableSource的時(shí)候,實(shí)現(xiàn)DefinedRowtimeAttributes接口來定義EventTime字段,在接口中需要實(shí)現(xiàn)getRowtimeAttributeDescriptors方法,創(chuàng)建基于EventTime的時(shí)間屬性信息。

// 定義帶有rowtime屬性的table source public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {@Overridepublic TypeInformation<Row> getReturnType() {String[] names = new String[] {"user_name", "data", "user_action_time"};TypeInformation[] types =new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};return Types.ROW(names, types);}@Overridepublic DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {// 創(chuàng)建流,基于user_action_time屬性分配水位線DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);return stream;}@Overridepublic List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {// 標(biāo)記user_action_time字段作為事件時(shí)間屬性// 創(chuàng)建user_action_time描述符,用來標(biāo)識(shí)時(shí)間屬性字段RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor("user_action_time",new ExistingField("user_action_time"),new AscendingTimestamps());List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);return listRowtimeAttrDescr;} }// register表 tEnv.registerTableSource("user_actions", new UserActionSource());WindowedTable windowedTable = tEnv.from("user_actions").window(Tumble.over("10.minutes").on("user_action_time").as("userActionWindow"));

小結(jié)

本文主要介紹了如何在Flink Table API和SQL中使用時(shí)間語義,可以使用兩種時(shí)間語義:處理時(shí)間和事件時(shí)間。分別對(duì)每種的時(shí)間語義的使用方式進(jìn)行了詳細(xì)解釋。

往期精彩回顧

Flink Table API & SQL編程指南(1)

Flink Table API & SQL編程指南之動(dòng)態(tài)表(2)

總結(jié)

以上是生活随笔為你收集整理的duration转为时间戳_Flink Table APIamp;SQL编程指南之时间属性(3)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。