日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 前端技术 > javascript >内容正文

javascript

Flink SQL JSON Format 源码解析

發(fā)布時(shí)間:2024/3/12 javascript 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink SQL JSON Format 源码解析 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

用 Flink SQL 解析 JSON 格式的數(shù)據(jù)是非常簡(jiǎn)單的,只需要在 DDL 語句中設(shè)置 Format 為 json 即可,像下面這樣:

CREATE?TABLE?kafka_source?(funcName?STRING,data?ROW<snapshots?ARRAY<ROW<content_type?STRING,url?STRING>>,audio?ARRAY<ROW<content_type?STRING,url?STRING>>>,resultMap?ROW<`result`?MAP<STRING,STRING>,isSuccess?BOOLEAN>,meta??MAP<STRING,STRING>,`type`?INT,`timestamp`?BIGINT,arr?ARRAY<ROW<address?STRING,city?STRING>>,map?MAP<STRING,INT>,doublemap?MAP<STRING,MAP<STRING,INT>>,proctime?as?PROCTIME() )?WITH?('connector'?=?'kafka',?--?使用?kafka?connector'topic'?=?'test',??--?kafka?topic'properties.bootstrap.servers'?=?'master:9092,storm1:9092,storm2:9092',??--?broker連接信息'properties.group.id'?=?'jason_flink_test',?--?消費(fèi)kafka的group_id'scan.startup.mode'?=?'latest-offset',??--?讀取數(shù)據(jù)的位置'format'?=?'json',??--?數(shù)據(jù)源格式為?json'json.fail-on-missing-field'?=?'true',?--?字段丟失任務(wù)不失敗'json.ignore-parse-errors'?=?'false'??--?解析失敗跳過 )

那么你有沒有想過它的底層是怎么實(shí)現(xiàn)的呢? 今天這篇文章就帶你深入淺出,了解其實(shí)現(xiàn)細(xì)節(jié).

當(dāng)你輸入一條 SQL 的時(shí)候在 Flink 里面會(huì)經(jīng)過解析,驗(yàn)證,優(yōu)化,轉(zhuǎn)換等幾個(gè)重要的步驟,因?yàn)榍懊娴膸讉€(gè)過程比較繁瑣,這里暫時(shí)不展開說明,我們直接來到比較關(guān)鍵的源碼處,在把 sqlNode 轉(zhuǎn)換成 relNode 的過程中,會(huì)來到 CatalogSourceTable#createDynamicTableSource 該類的作用是把 Calcite 的 RelOptTable 翻譯成 Flink 的 TableSourceTable 對(duì)象.

createDynamicTableSource ?源碼

private?DynamicTableSource?createDynamicTableSource(FlinkContext?context,?ResolvedCatalogTable?catalogTable)?{final?ReadableConfig?config?=?context.getTableConfig().getConfiguration();return?FactoryUtil.createTableSource(schemaTable.getCatalog(),schemaTable.getTableIdentifier(),catalogTable,config,Thread.currentThread().getContextClassLoader(),schemaTable.isTemporary()); }

其實(shí)這個(gè)就是要?jiǎng)?chuàng)建 Kafka Source 的流表,然后會(huì)調(diào)用 FactoryUtil#createTableSource 這個(gè)方法

createTableSource 源碼

public?static?DynamicTableSource?createTableSource(@Nullable?Catalog?catalog,ObjectIdentifier?objectIdentifier,ResolvedCatalogTable?catalogTable,ReadableConfig?configuration,ClassLoader?classLoader,boolean?isTemporary)?{final?DefaultDynamicTableContext?context?=new?DefaultDynamicTableContext(objectIdentifier,?catalogTable,?configuration,?classLoader,?isTemporary);try?{//?獲取對(duì)應(yīng)的?factory?這里其實(shí)就是?KafkaDynamicTableFactoryfinal?DynamicTableSourceFactory?factory?=getDynamicTableFactory(DynamicTableSourceFactory.class,?catalog,?context);//?創(chuàng)建動(dòng)態(tài)表return?factory.createDynamicTableSource(context);}?catch?(Throwable?t)?{throw?new?ValidationException(String.format("Unable?to?create?a?source?for?reading?table?'%s'.\n\n"+?"Table?options?are:\n\n"+?"%s",objectIdentifier.asSummaryString(),catalogTable.getOptions().entrySet().stream().map(e?->?stringifyOption(e.getKey(),?e.getValue())).sorted().collect(Collectors.joining("\n"))),t);} }

在這個(gè)方法里面,有兩個(gè)重要的過程,首先是獲取對(duì)應(yīng)的 factory 對(duì)象,然后創(chuàng)建 DynamicTableSource 實(shí)例.在 getDynamicTableFactory 中實(shí)際調(diào)用的是 discoverFactory 方法,顧名思義就是發(fā)現(xiàn)工廠.

discoverFactory 源碼

public?static?<T?extends?Factory>?T?discoverFactory(ClassLoader?classLoader,?Class<T>?factoryClass,?String?factoryIdentifier)?{final?List<Factory>?factories?=?discoverFactories(classLoader);final?List<Factory>?foundFactories?=factories.stream().filter(f?->?factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());if?(foundFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factories?that?implement?'%s'?in?the?classpath.",factoryClass.getName()));}final?List<Factory>?matchingFactories?=foundFactories.stream().filter(f?->?f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());if?(matchingFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factory?for?identifier?'%s'?that?implements?'%s'?in?the?classpath.\n\n"+?"Available?factory?identifiers?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),foundFactories.stream().map(Factory::factoryIdentifier).distinct().sorted().collect(Collectors.joining("\n"))));}if?(matchingFactories.size()?>?1)?{throw?new?ValidationException(String.format("Multiple?factories?for?identifier?'%s'?that?implement?'%s'?found?in?the?classpath.\n\n"+?"Ambiguous?factory?classes?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),matchingFactories.stream().map(f?->?f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));}return?(T)?matchingFactories.get(0); }

這個(gè)代碼相對(duì)簡(jiǎn)單,就不加注釋了,邏輯也非常的清晰,就是獲取對(duì)應(yīng)的 factory ,先是通過 SPI 機(jī)制加載所有的 factory 然后根據(jù) factoryIdentifier 過濾出滿足條件的,這里其實(shí)就是 kafka connector 了.最后還有一些異常的判斷.

discoverFactories 源碼

private?static?List<Factory>?discoverFactories(ClassLoader?classLoader)?{try?{final?List<Factory>?result?=?new?LinkedList<>();ServiceLoader.load(Factory.class,?classLoader).iterator().forEachRemaining(result::add);return?result;}?catch?(ServiceConfigurationError?e)?{LOG.error("Could?not?load?service?provider?for?factories.",?e);throw?new?TableException("Could?not?load?service?provider?for?factories.",?e);} }

這個(gè)代碼大家應(yīng)該比較熟悉了,前面也有文章介紹過了.加載所有的 Factory 返回一個(gè) Factory 的集合.

下面才是今天的重點(diǎn).

createDynamicTableSource 源碼

public?DynamicTableSource?createDynamicTableSource(Context?context)?{TableFactoryHelper?helper?=?FactoryUtil.createTableFactoryHelper(this,?context);ReadableConfig?tableOptions?=?helper.getOptions();Optional<DecodingFormat<DeserializationSchema<RowData>>>?keyDecodingFormat?=?getKeyDecodingFormat(helper);//?format?的邏輯DecodingFormat<DeserializationSchema<RowData>>?valueDecodingFormat?=?getValueDecodingFormat(helper);helper.validateExcept(new?String[]{"properties."});KafkaOptions.validateTableSourceOptions(tableOptions);validatePKConstraints(context.getObjectIdentifier(),?context.getCatalogTable(),?valueDecodingFormat);StartupOptions?startupOptions?=?KafkaOptions.getStartupOptions(tableOptions);Properties?properties?=?KafkaOptions.getKafkaProperties(context.getCatalogTable().getOptions());properties.setProperty("flink.partition-discovery.interval-millis",?String.valueOf(tableOptions.getOptional(KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY).map(Duration::toMillis).orElse(-9223372036854775808L)));DataType?physicalDataType?=?context.getCatalogTable().getSchema().toPhysicalRowDataType();int[]?keyProjection?=?KafkaOptions.createKeyFormatProjection(tableOptions,?physicalDataType);int[]?valueProjection?=?KafkaOptions.createValueFormatProjection(tableOptions,?physicalDataType);String?keyPrefix?=?(String)tableOptions.getOptional(KafkaOptions.KEY_FIELDS_PREFIX).orElse((Object)null);return?this.createKafkaTableSource(physicalDataType,?(DecodingFormat)keyDecodingFormat.orElse((Object)null),?valueDecodingFormat,?keyProjection,?valueProjection,?keyPrefix,?KafkaOptions.getSourceTopics(tableOptions),?KafkaOptions.getSourceTopicPattern(tableOptions),?properties,?startupOptions.startupMode,?startupOptions.specificOffsets,?startupOptions.startupTimestampMillis); }

getValueDecodingFormat 方法最終會(huì)調(diào)用 discoverOptionalFormatFactory 方法

discoverOptionalDecodingFormat 和 discoverOptionalFormatFactory 源碼

public?<I,?F?extends?DecodingFormatFactory<I>>Optional<DecodingFormat<I>>?discoverOptionalDecodingFormat(Class<F>?formatFactoryClass,?ConfigOption<String>?formatOption)?{return?discoverOptionalFormatFactory(formatFactoryClass,?formatOption).map(formatFactory?->?{String?formatPrefix?=?formatPrefix(formatFactory,?formatOption);try?{return?formatFactory.createDecodingFormat(context,?projectOptions(formatPrefix));}?catch?(Throwable?t)?{throw?new?ValidationException(String.format("Error?creating?scan?format?'%s'?in?option?space?'%s'.",formatFactory.factoryIdentifier(),formatPrefix),t);}});}private?<F?extends?Factory>?Optional<F>?discoverOptionalFormatFactory(Class<F>?formatFactoryClass,?ConfigOption<String>?formatOption)?{final?String?identifier?=?allOptions.get(formatOption);if?(identifier?==?null)?{return?Optional.empty();}final?F?factory?=discoverFactory(context.getClassLoader(),?formatFactoryClass,?identifier);String?formatPrefix?=?formatPrefix(factory,?formatOption);//?log?all?used?options?of?other?factoriesconsumedOptionKeys.addAll(factory.requiredOptions().stream().map(ConfigOption::key).map(k?->?formatPrefix?+?k).collect(Collectors.toSet()));consumedOptionKeys.addAll(factory.optionalOptions().stream().map(ConfigOption::key).map(k?->?formatPrefix?+?k).collect(Collectors.toSet()));return?Optional.of(factory); }//?直接過濾出滿足條件的?format? public?static?<T?extends?Factory>?T?discoverFactory(ClassLoader?classLoader,?Class<T>?factoryClass,?String?factoryIdentifier)?{final?List<Factory>?factories?=?discoverFactories(classLoader);final?List<Factory>?foundFactories?=factories.stream().filter(f?->?factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());if?(foundFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factories?that?implement?'%s'?in?the?classpath.",factoryClass.getName()));}final?List<Factory>?matchingFactories?=foundFactories.stream().filter(f?->?f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());if?(matchingFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factory?for?identifier?'%s'?that?implements?'%s'?in?the?classpath.\n\n"+?"Available?factory?identifiers?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),foundFactories.stream().map(Factory::factoryIdentifier).distinct().sorted().collect(Collectors.joining("\n"))));}if?(matchingFactories.size()?>?1)?{throw?new?ValidationException(String.format("Multiple?factories?for?identifier?'%s'?that?implement?'%s'?found?in?the?classpath.\n\n"+?"Ambiguous?factory?classes?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),matchingFactories.stream().map(f?->?f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));}return?(T)?matchingFactories.get(0);}

這里的邏輯和上面加載 connector 的邏輯是一樣的,同樣通過 SPI 先加載所有的 format 然后根據(jù) factoryIdentifier 過濾出滿足條件的 format 這里其實(shí)就是 json 了. 返回 formatFactory 后開始創(chuàng)建 format 這個(gè)時(shí)候就會(huì)走到 JsonFormatFactory#createDecodingFormat 這個(gè)方法里面.真正的創(chuàng)建一個(gè) DecodingFormat 對(duì)象.

createDecodingFormat 源碼

@Overridepublic?DecodingFormat<DeserializationSchema<RowData>>?createDecodingFormat(DynamicTableFactory.Context?context,?ReadableConfig?formatOptions)?{//?驗(yàn)證相關(guān)的參數(shù)FactoryUtil.validateFactoryOptions(this,?formatOptions);//?驗(yàn)證?json.fail-on-missing-field?和?json.ignore-parse-errorsvalidateDecodingFormatOptions(formatOptions);//?獲取?json.fail-on-missing-field?和?json.ignore-parse-errorsfinal?boolean?failOnMissingField?=?formatOptions.get(FAIL_ON_MISSING_FIELD);final?boolean?ignoreParseErrors?=?formatOptions.get(IGNORE_PARSE_ERRORS);//?獲取?timestamp-format.standardTimestampFormat?timestampOption?=?JsonOptions.getTimestampFormat(formatOptions);return?new?DecodingFormat<DeserializationSchema<RowData>>()?{@Overridepublic?DeserializationSchema<RowData>?createRuntimeDecoder(DynamicTableSource.Context?context,?DataType?producedDataType)?{final?RowType?rowType?=?(RowType)?producedDataType.getLogicalType();final?TypeInformation<RowData>?rowDataTypeInfo?=context.createTypeInformation(producedDataType);return?new?JsonRowDataDeserializationSchema(rowType,rowDataTypeInfo,failOnMissingField,ignoreParseErrors,timestampOption);}@Overridepublic?ChangelogMode?getChangelogMode()?{return?ChangelogMode.insertOnly();}};}

這里的邏輯也非常簡(jiǎn)單,首先會(huì)對(duì) format 相關(guān)的參數(shù)進(jìn)行驗(yàn)證, 然后驗(yàn)證 json.fail-on-missing-field 和 json.ignore-parse-errors 這兩個(gè)參數(shù).之后就開始創(chuàng)建 JsonRowDataDeserializationSchema 對(duì)象

JsonRowDataDeserializationSchema 源碼

public?JsonRowDataDeserializationSchema(RowType?rowType,TypeInformation<RowData>?resultTypeInfo,boolean?failOnMissingField,boolean?ignoreParseErrors,TimestampFormat?timestampFormat)?{if?(ignoreParseErrors?&&?failOnMissingField)?{throw?new?IllegalArgumentException("JSON?format?doesn't?support?failOnMissingField?and?ignoreParseErrors?are?both?enabled.");}this.resultTypeInfo?=?checkNotNull(resultTypeInfo);this.failOnMissingField?=?failOnMissingField;this.ignoreParseErrors?=?ignoreParseErrors;this.runtimeConverter?=new?JsonToRowDataConverters(failOnMissingField,?ignoreParseErrors,?timestampFormat).createConverter(checkNotNull(rowType));this.timestampFormat?=?timestampFormat;boolean?hasDecimalType?=LogicalTypeChecks.hasNested(rowType,?t?->?t?instanceof?DecimalType);if?(hasDecimalType)?{objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);}objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),?true); }

在構(gòu)造方法里面最重要的是創(chuàng)建 JsonToRowDataConverter 對(duì)象,這里面方法的調(diào)用比較多,這里只重要的方法進(jìn)行說明

createRowConverter 源碼

public?JsonToRowDataConverter?createRowConverter(RowType?rowType)?{final?JsonToRowDataConverter[]?fieldConverters?=rowType.getFields().stream().map(RowType.RowField::getType).map(this::createConverter).toArray(JsonToRowDataConverter[]::new);final?String[]?fieldNames?=?rowType.getFieldNames().toArray(new?String[0]);return?jsonNode?->?{ObjectNode?node?=?(ObjectNode)?jsonNode;int?arity?=?fieldNames.length;GenericRowData?row?=?new?GenericRowData(arity);for?(int?i?=?0;?i?<?arity;?i++)?{String?fieldName?=?fieldNames[i];JsonNode?field?=?node.get(fieldName);Object?convertedField?=?convertField(fieldConverters[i],?fieldName,?field);row.setField(i,?convertedField);}return?row;}; }

因?yàn)槭?JSON 格式的數(shù)據(jù),所以是一個(gè) ROW 類型,所以要先創(chuàng)建 JsonToRowDataConverter 對(duì)象,然后在這里會(huì)對(duì)每一個(gè)字段創(chuàng)建一個(gè) fieldConverter 根據(jù)你在 DDL 里面定義的字段類型走不同的轉(zhuǎn)換方法,比如 String 類型的數(shù)據(jù)會(huì)調(diào)用 convertToString 方法

convertToString 源碼

private?StringData?convertToString(JsonNode?jsonNode)?{if?(jsonNode.isContainerNode())?{return?StringData.fromString(jsonNode.toString());}?else?{return?StringData.fromString(jsonNode.asText());} }

這里需要注意的是 string 類型的數(shù)據(jù)需要返回 StringData 類型不然會(huì)報(bào)類型轉(zhuǎn)換異常的錯(cuò).感興趣的朋友可以看下其他類型是如何處理的.

到這里 JsonRowDataDeserializationSchema 對(duì)象就構(gòu)造完成了.那后面其實(shí)就是優(yōu)化,轉(zhuǎn)換到翻譯成 streamGraph 再后面的過程就和 datastream api 開發(fā)的任務(wù)一樣了.

然后真正開始消費(fèi)數(shù)據(jù)的時(shí)候,會(huì)走到 JsonRowDataDeserializationSchema#deserialize 方法對(duì)數(shù)據(jù)進(jìn)行反序列化.

deserialize 源碼

@Override public?RowData?deserialize(@Nullable?byte[]?message)?throws?IOException?{if?(message?==?null)?{return?null;}try?{return?convertToRowData(deserializeToJsonNode(message));}?catch?(Throwable?t)?{if?(ignoreParseErrors)?{return?null;}throw?new?IOException(format("Failed?to?deserialize?JSON?'%s'.",?new?String(message)),?t);} }

先會(huì)把數(shù)據(jù)反序列成 JsonNode 對(duì)象.

deserializeToJsonNode 源碼

public?JsonNode?deserializeToJsonNode(byte[]?message)?throws?IOException?{return?objectMapper.readTree(message); }

可以看到 Flink 的內(nèi)部是用 jackson 解析數(shù)據(jù)的.接著把 jsonNode 格式的數(shù)據(jù)轉(zhuǎn)換成 RowData 格式的數(shù)據(jù)

convertToRowData 源碼

public?RowData?convertToRowData(JsonNode?message)?{return?(RowData)?runtimeConverter.convert(message); }

然后這里的調(diào)用其實(shí)和上面構(gòu)造 JsonRowDataDeserializationSchema 的時(shí)候是一樣的

return?jsonNode?->?{ObjectNode?node?=?(ObjectNode)?jsonNode;int?arity?=?fieldNames.length;GenericRowData?row?=?new?GenericRowData(arity);for?(int?i?=?0;?i?<?arity;?i++)?{String?fieldName?=?fieldNames[i];JsonNode?field?=?node.get(fieldName);Object?convertedField?=?convertField(fieldConverters[i],?fieldName,?field);row.setField(i,?convertedField);}return?row; };

最終返回的是 GenericRowData 類型的數(shù)據(jù),其實(shí)就是 RowData 類型的,因?yàn)槭?RowData 的實(shí)現(xiàn)類.然后就會(huì)把反序列后的數(shù)據(jù)發(fā)送到下游了.

總結(jié)

這篇文章主要分析了 Flink SQL JSON Format 的相關(guān)源碼,從構(gòu)建 JsonRowDataDeserializationSchema 到反序列化數(shù)據(jù) deserialize.因?yàn)槠?只展示每個(gè)環(huán)節(jié)最重要的代碼,其實(shí)很多細(xì)節(jié)都直接跳過了.感興趣的朋友也可以自己去調(diào)試一下代碼.有時(shí)間的話會(huì)更新更多的實(shí)現(xiàn)細(xì)節(jié).

推薦閱讀

Flink 任務(wù)實(shí)時(shí)監(jiān)控最佳實(shí)踐

Flink on yarn 實(shí)時(shí)日志收集最佳實(shí)踐

Flink 1.14.0 全新的 Kafka Connector

Flink 1.14.0 消費(fèi) kafka 數(shù)據(jù)自定義反序列化類

如果你覺得文章對(duì)你有幫助,麻煩點(diǎn)一下贊和在看吧,你的支持是我創(chuàng)作的最大動(dòng)力.

總結(jié)

以上是生活随笔為你收集整理的Flink SQL JSON Format 源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。