日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Please move ‘proctime(r_proctime)‘ to the end of the schema.以及rowtime和proctime

發布時間:2023/12/31 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Please move ‘proctime(r_proctime)‘ to the end of the schema.以及rowtime和proctime 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

代碼:

List<Tuple3<String,Long,Timestamp>>ratesHistoryData = new ArrayList<>();ratesHistoryData.add(Tuple3.of("US Dollar", 102L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Euro", 114L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Yen", 1L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Euro", 116L,new Timestamp(1L)));ratesHistoryData.add(Tuple3.of("Euro", 119L,new Timestamp(1L)));// List轉DataStreamDataStream<Tuple3<String, Long,Timestamp>> ratesHistoryStream = env.fromCollection(ratesHistoryData);// Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate,r_proctime.proctime"); // DataStream轉TableTable ratesHistory = tEnv.fromDataStream(ratesHistoryStream, $("r_currency"), $("r_rate"), $("r_proctime").proctime());

?

完整報錯:

Exception:Exception in thread "main" org.apache.flink.table.api.ValidationException: The proctime attribute can only be appended to the table schema and not replace an existing field. Please move 'proctime(r_proctime)' to the end of the schema.at org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.validateProcTimeAttributeAppended(FieldInfoUtils.java:512)at org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.visit(FieldInfoUtils.java:483)at org.apache.flink.table.typeutils.FieldInfoUtils$IndexedExprToFieldInfo.visit(FieldInfoUtils.java:459)at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)at org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)at org.apache.flink.table.typeutils.FieldInfoUtils.lambda$extractFieldInfosFromTupleType$6(FieldInfoUtils.java:421)at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInfosFromTupleType(FieldInfoUtils.java:422)at org.apache.flink.table.typeutils.FieldInfoUtils.extractFieldInformation(FieldInfoUtils.java:264)at org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(FieldInfoUtils.java:233)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.lambda$asQueryOperation$0(StreamTableEnvironmentImpl.java:384)at java.util.Optional.map(Optional.java:215)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.asQueryOperation(StreamTableEnvironmentImpl.java:383)at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.fromDataStream(StreamTableEnvironmentImpl.java:230)at JoinwithTemporalTable.main(JoinwithTemporalTable.java:51)

先講下proctime和rowtime,根據文檔[1]

函數代表的含義完整案例示范
proctime()Processing Time完整案例
rowtime()Event Time完整案例

?

?

注意哈,這個問題的解決方案你不能只看一句代碼,需要看下前面你的數據。

解決方案(rowtime方式):

如果你想使用rowtime,也就是Event Time

那么修改方案是:

? ? ? ? List<Tuple3<String,Long,Timestamp>>ratesHistoryData = new ArrayList<>();
? ? ? ? ratesHistoryData.add(Tuple3.of("US Dollar", 102L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Euro", 114L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Yen", 1L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Euro", 116L,new Timestamp(1L)));
? ? ? ? ratesHistoryData.add(Tuple3.of("Euro", 119L,new Timestamp(1L)));

Table ratesHistory = tEnv.createTemporaryView("Orders", orderA,$("o_currency"),$("o_ratet"),$("o_proctime").rowtime());

?

解決方案(proctime方式)

如果你想使用proctime,也就是Processing Time

那么修改方案是:

? ? ? ? DataStream<Tuple2<String, Long>> orderA = env.fromCollection(Arrays.asList(
? ? ? ? ? ? ? ? Tuple2.of("US Dollar", 102L),
? ? ? ? ? ? ? ? Tuple2.of("US Dollar", 102L),
? ? ? ? ? ? ? ? Tuple2.of("US Dollar", 102L)));

? ? ? ? tableEnv.createTemporaryView("Orders", orderA, $("o_currency"), $("o_rate"), $("o_proctime").proctime());

?

注意兩點:

①這里的解決方案中我特意都提到了原始數據。

因為這個報錯是和原始數據息息相關的,你如果只盯著報錯的那一句是不可能解決的。

proctime的總列數=原始數據列數+1

因為ProcessingTime是我們新加入的一列數據,你顯然不能把數據里包含的原來的時間戳用proctime標記為ProcessingTime對吧?

rowtime的總列數=原始數據列數

這是因為,EventTime就來自數據本身,處理時自然不用新增一列。

?

?

Reference:

[1]Time Attributes

[2]Apache Flink 進階入門(二):Time 深度解析

總結

以上是生活随笔為你收集整理的Please move ‘proctime(r_proctime)‘ to the end of the schema.以及rowtime和proctime的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。