javascript
Flink SQL JSON Format 源码解析
用 Flink SQL 解析 JSON 格式的數(shù)據(jù)是非常簡(jiǎn)單的,只需要在 DDL 語句中設(shè)置 Format 為 json 即可,像下面這樣:
那么你有沒有想過它的底層是怎么實(shí)現(xiàn)的呢? 今天這篇文章就帶你深入淺出,了解其實(shí)現(xiàn)細(xì)節(jié).
當(dāng)你輸入一條 SQL 的時(shí)候在 Flink 里面會(huì)經(jīng)過解析,驗(yàn)證,優(yōu)化,轉(zhuǎn)換等幾個(gè)重要的步驟,因?yàn)榍懊娴膸讉€(gè)過程比較繁瑣,這里暫時(shí)不展開說明,我們直接來到比較關(guān)鍵的源碼處,在把 sqlNode 轉(zhuǎn)換成 relNode 的過程中,會(huì)來到 CatalogSourceTable#createDynamicTableSource 該類的作用是把 Calcite 的 RelOptTable 翻譯成 Flink 的 TableSourceTable 對(duì)象.
createDynamicTableSource ?源碼
private?DynamicTableSource?createDynamicTableSource(FlinkContext?context,?ResolvedCatalogTable?catalogTable)?{final?ReadableConfig?config?=?context.getTableConfig().getConfiguration();return?FactoryUtil.createTableSource(schemaTable.getCatalog(),schemaTable.getTableIdentifier(),catalogTable,config,Thread.currentThread().getContextClassLoader(),schemaTable.isTemporary()); }其實(shí)這個(gè)就是要?jiǎng)?chuàng)建 Kafka Source 的流表,然后會(huì)調(diào)用 FactoryUtil#createTableSource 這個(gè)方法
createTableSource 源碼
public?static?DynamicTableSource?createTableSource(@Nullable?Catalog?catalog,ObjectIdentifier?objectIdentifier,ResolvedCatalogTable?catalogTable,ReadableConfig?configuration,ClassLoader?classLoader,boolean?isTemporary)?{final?DefaultDynamicTableContext?context?=new?DefaultDynamicTableContext(objectIdentifier,?catalogTable,?configuration,?classLoader,?isTemporary);try?{//?獲取對(duì)應(yīng)的?factory?這里其實(shí)就是?KafkaDynamicTableFactoryfinal?DynamicTableSourceFactory?factory?=getDynamicTableFactory(DynamicTableSourceFactory.class,?catalog,?context);//?創(chuàng)建動(dòng)態(tài)表return?factory.createDynamicTableSource(context);}?catch?(Throwable?t)?{throw?new?ValidationException(String.format("Unable?to?create?a?source?for?reading?table?'%s'.\n\n"+?"Table?options?are:\n\n"+?"%s",objectIdentifier.asSummaryString(),catalogTable.getOptions().entrySet().stream().map(e?->?stringifyOption(e.getKey(),?e.getValue())).sorted().collect(Collectors.joining("\n"))),t);} }在這個(gè)方法里面,有兩個(gè)重要的過程,首先是獲取對(duì)應(yīng)的 factory 對(duì)象,然后創(chuàng)建 DynamicTableSource 實(shí)例.在 getDynamicTableFactory 中實(shí)際調(diào)用的是 discoverFactory 方法,顧名思義就是發(fā)現(xiàn)工廠.
discoverFactory 源碼
public?static?<T?extends?Factory>?T?discoverFactory(ClassLoader?classLoader,?Class<T>?factoryClass,?String?factoryIdentifier)?{final?List<Factory>?factories?=?discoverFactories(classLoader);final?List<Factory>?foundFactories?=factories.stream().filter(f?->?factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());if?(foundFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factories?that?implement?'%s'?in?the?classpath.",factoryClass.getName()));}final?List<Factory>?matchingFactories?=foundFactories.stream().filter(f?->?f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());if?(matchingFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factory?for?identifier?'%s'?that?implements?'%s'?in?the?classpath.\n\n"+?"Available?factory?identifiers?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),foundFactories.stream().map(Factory::factoryIdentifier).distinct().sorted().collect(Collectors.joining("\n"))));}if?(matchingFactories.size()?>?1)?{throw?new?ValidationException(String.format("Multiple?factories?for?identifier?'%s'?that?implement?'%s'?found?in?the?classpath.\n\n"+?"Ambiguous?factory?classes?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),matchingFactories.stream().map(f?->?f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));}return?(T)?matchingFactories.get(0); }這個(gè)代碼相對(duì)簡(jiǎn)單,就不加注釋了,邏輯也非常的清晰,就是獲取對(duì)應(yīng)的 factory ,先是通過 SPI 機(jī)制加載所有的 factory 然后根據(jù) factoryIdentifier 過濾出滿足條件的,這里其實(shí)就是 kafka connector 了.最后還有一些異常的判斷.
discoverFactories 源碼
private?static?List<Factory>?discoverFactories(ClassLoader?classLoader)?{try?{final?List<Factory>?result?=?new?LinkedList<>();ServiceLoader.load(Factory.class,?classLoader).iterator().forEachRemaining(result::add);return?result;}?catch?(ServiceConfigurationError?e)?{LOG.error("Could?not?load?service?provider?for?factories.",?e);throw?new?TableException("Could?not?load?service?provider?for?factories.",?e);} }這個(gè)代碼大家應(yīng)該比較熟悉了,前面也有文章介紹過了.加載所有的 Factory 返回一個(gè) Factory 的集合.
下面才是今天的重點(diǎn).
createDynamicTableSource 源碼
public?DynamicTableSource?createDynamicTableSource(Context?context)?{TableFactoryHelper?helper?=?FactoryUtil.createTableFactoryHelper(this,?context);ReadableConfig?tableOptions?=?helper.getOptions();Optional<DecodingFormat<DeserializationSchema<RowData>>>?keyDecodingFormat?=?getKeyDecodingFormat(helper);//?format?的邏輯DecodingFormat<DeserializationSchema<RowData>>?valueDecodingFormat?=?getValueDecodingFormat(helper);helper.validateExcept(new?String[]{"properties."});KafkaOptions.validateTableSourceOptions(tableOptions);validatePKConstraints(context.getObjectIdentifier(),?context.getCatalogTable(),?valueDecodingFormat);StartupOptions?startupOptions?=?KafkaOptions.getStartupOptions(tableOptions);Properties?properties?=?KafkaOptions.getKafkaProperties(context.getCatalogTable().getOptions());properties.setProperty("flink.partition-discovery.interval-millis",?String.valueOf(tableOptions.getOptional(KafkaOptions.SCAN_TOPIC_PARTITION_DISCOVERY).map(Duration::toMillis).orElse(-9223372036854775808L)));DataType?physicalDataType?=?context.getCatalogTable().getSchema().toPhysicalRowDataType();int[]?keyProjection?=?KafkaOptions.createKeyFormatProjection(tableOptions,?physicalDataType);int[]?valueProjection?=?KafkaOptions.createValueFormatProjection(tableOptions,?physicalDataType);String?keyPrefix?=?(String)tableOptions.getOptional(KafkaOptions.KEY_FIELDS_PREFIX).orElse((Object)null);return?this.createKafkaTableSource(physicalDataType,?(DecodingFormat)keyDecodingFormat.orElse((Object)null),?valueDecodingFormat,?keyProjection,?valueProjection,?keyPrefix,?KafkaOptions.getSourceTopics(tableOptions),?KafkaOptions.getSourceTopicPattern(tableOptions),?properties,?startupOptions.startupMode,?startupOptions.specificOffsets,?startupOptions.startupTimestampMillis); }getValueDecodingFormat 方法最終會(huì)調(diào)用 discoverOptionalFormatFactory 方法
discoverOptionalDecodingFormat 和 discoverOptionalFormatFactory 源碼
public?<I,?F?extends?DecodingFormatFactory<I>>Optional<DecodingFormat<I>>?discoverOptionalDecodingFormat(Class<F>?formatFactoryClass,?ConfigOption<String>?formatOption)?{return?discoverOptionalFormatFactory(formatFactoryClass,?formatOption).map(formatFactory?->?{String?formatPrefix?=?formatPrefix(formatFactory,?formatOption);try?{return?formatFactory.createDecodingFormat(context,?projectOptions(formatPrefix));}?catch?(Throwable?t)?{throw?new?ValidationException(String.format("Error?creating?scan?format?'%s'?in?option?space?'%s'.",formatFactory.factoryIdentifier(),formatPrefix),t);}});}private?<F?extends?Factory>?Optional<F>?discoverOptionalFormatFactory(Class<F>?formatFactoryClass,?ConfigOption<String>?formatOption)?{final?String?identifier?=?allOptions.get(formatOption);if?(identifier?==?null)?{return?Optional.empty();}final?F?factory?=discoverFactory(context.getClassLoader(),?formatFactoryClass,?identifier);String?formatPrefix?=?formatPrefix(factory,?formatOption);//?log?all?used?options?of?other?factoriesconsumedOptionKeys.addAll(factory.requiredOptions().stream().map(ConfigOption::key).map(k?->?formatPrefix?+?k).collect(Collectors.toSet()));consumedOptionKeys.addAll(factory.optionalOptions().stream().map(ConfigOption::key).map(k?->?formatPrefix?+?k).collect(Collectors.toSet()));return?Optional.of(factory); }//?直接過濾出滿足條件的?format? public?static?<T?extends?Factory>?T?discoverFactory(ClassLoader?classLoader,?Class<T>?factoryClass,?String?factoryIdentifier)?{final?List<Factory>?factories?=?discoverFactories(classLoader);final?List<Factory>?foundFactories?=factories.stream().filter(f?->?factoryClass.isAssignableFrom(f.getClass())).collect(Collectors.toList());if?(foundFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factories?that?implement?'%s'?in?the?classpath.",factoryClass.getName()));}final?List<Factory>?matchingFactories?=foundFactories.stream().filter(f?->?f.factoryIdentifier().equals(factoryIdentifier)).collect(Collectors.toList());if?(matchingFactories.isEmpty())?{throw?new?ValidationException(String.format("Could?not?find?any?factory?for?identifier?'%s'?that?implements?'%s'?in?the?classpath.\n\n"+?"Available?factory?identifiers?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),foundFactories.stream().map(Factory::factoryIdentifier).distinct().sorted().collect(Collectors.joining("\n"))));}if?(matchingFactories.size()?>?1)?{throw?new?ValidationException(String.format("Multiple?factories?for?identifier?'%s'?that?implement?'%s'?found?in?the?classpath.\n\n"+?"Ambiguous?factory?classes?are:\n\n"+?"%s",factoryIdentifier,factoryClass.getName(),matchingFactories.stream().map(f?->?f.getClass().getName()).sorted().collect(Collectors.joining("\n"))));}return?(T)?matchingFactories.get(0);}這里的邏輯和上面加載 connector 的邏輯是一樣的,同樣通過 SPI 先加載所有的 format 然后根據(jù) factoryIdentifier 過濾出滿足條件的 format 這里其實(shí)就是 json 了. 返回 formatFactory 后開始創(chuàng)建 format 這個(gè)時(shí)候就會(huì)走到 JsonFormatFactory#createDecodingFormat 這個(gè)方法里面.真正的創(chuàng)建一個(gè) DecodingFormat 對(duì)象.
createDecodingFormat 源碼
@Overridepublic?DecodingFormat<DeserializationSchema<RowData>>?createDecodingFormat(DynamicTableFactory.Context?context,?ReadableConfig?formatOptions)?{//?驗(yàn)證相關(guān)的參數(shù)FactoryUtil.validateFactoryOptions(this,?formatOptions);//?驗(yàn)證?json.fail-on-missing-field?和?json.ignore-parse-errorsvalidateDecodingFormatOptions(formatOptions);//?獲取?json.fail-on-missing-field?和?json.ignore-parse-errorsfinal?boolean?failOnMissingField?=?formatOptions.get(FAIL_ON_MISSING_FIELD);final?boolean?ignoreParseErrors?=?formatOptions.get(IGNORE_PARSE_ERRORS);//?獲取?timestamp-format.standardTimestampFormat?timestampOption?=?JsonOptions.getTimestampFormat(formatOptions);return?new?DecodingFormat<DeserializationSchema<RowData>>()?{@Overridepublic?DeserializationSchema<RowData>?createRuntimeDecoder(DynamicTableSource.Context?context,?DataType?producedDataType)?{final?RowType?rowType?=?(RowType)?producedDataType.getLogicalType();final?TypeInformation<RowData>?rowDataTypeInfo?=context.createTypeInformation(producedDataType);return?new?JsonRowDataDeserializationSchema(rowType,rowDataTypeInfo,failOnMissingField,ignoreParseErrors,timestampOption);}@Overridepublic?ChangelogMode?getChangelogMode()?{return?ChangelogMode.insertOnly();}};}這里的邏輯也非常簡(jiǎn)單,首先會(huì)對(duì) format 相關(guān)的參數(shù)進(jìn)行驗(yàn)證, 然后驗(yàn)證 json.fail-on-missing-field 和 json.ignore-parse-errors 這兩個(gè)參數(shù).之后就開始創(chuàng)建 JsonRowDataDeserializationSchema 對(duì)象
JsonRowDataDeserializationSchema 源碼
public?JsonRowDataDeserializationSchema(RowType?rowType,TypeInformation<RowData>?resultTypeInfo,boolean?failOnMissingField,boolean?ignoreParseErrors,TimestampFormat?timestampFormat)?{if?(ignoreParseErrors?&&?failOnMissingField)?{throw?new?IllegalArgumentException("JSON?format?doesn't?support?failOnMissingField?and?ignoreParseErrors?are?both?enabled.");}this.resultTypeInfo?=?checkNotNull(resultTypeInfo);this.failOnMissingField?=?failOnMissingField;this.ignoreParseErrors?=?ignoreParseErrors;this.runtimeConverter?=new?JsonToRowDataConverters(failOnMissingField,?ignoreParseErrors,?timestampFormat).createConverter(checkNotNull(rowType));this.timestampFormat?=?timestampFormat;boolean?hasDecimalType?=LogicalTypeChecks.hasNested(rowType,?t?->?t?instanceof?DecimalType);if?(hasDecimalType)?{objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);}objectMapper.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),?true); }在構(gòu)造方法里面最重要的是創(chuàng)建 JsonToRowDataConverter 對(duì)象,這里面方法的調(diào)用比較多,這里只重要的方法進(jìn)行說明
createRowConverter 源碼
public?JsonToRowDataConverter?createRowConverter(RowType?rowType)?{final?JsonToRowDataConverter[]?fieldConverters?=rowType.getFields().stream().map(RowType.RowField::getType).map(this::createConverter).toArray(JsonToRowDataConverter[]::new);final?String[]?fieldNames?=?rowType.getFieldNames().toArray(new?String[0]);return?jsonNode?->?{ObjectNode?node?=?(ObjectNode)?jsonNode;int?arity?=?fieldNames.length;GenericRowData?row?=?new?GenericRowData(arity);for?(int?i?=?0;?i?<?arity;?i++)?{String?fieldName?=?fieldNames[i];JsonNode?field?=?node.get(fieldName);Object?convertedField?=?convertField(fieldConverters[i],?fieldName,?field);row.setField(i,?convertedField);}return?row;}; }因?yàn)槭?JSON 格式的數(shù)據(jù),所以是一個(gè) ROW 類型,所以要先創(chuàng)建 JsonToRowDataConverter 對(duì)象,然后在這里會(huì)對(duì)每一個(gè)字段創(chuàng)建一個(gè) fieldConverter 根據(jù)你在 DDL 里面定義的字段類型走不同的轉(zhuǎn)換方法,比如 String 類型的數(shù)據(jù)會(huì)調(diào)用 convertToString 方法
convertToString 源碼
private?StringData?convertToString(JsonNode?jsonNode)?{if?(jsonNode.isContainerNode())?{return?StringData.fromString(jsonNode.toString());}?else?{return?StringData.fromString(jsonNode.asText());} }這里需要注意的是 string 類型的數(shù)據(jù)需要返回 StringData 類型不然會(huì)報(bào)類型轉(zhuǎn)換異常的錯(cuò).感興趣的朋友可以看下其他類型是如何處理的.
到這里 JsonRowDataDeserializationSchema 對(duì)象就構(gòu)造完成了.那后面其實(shí)就是優(yōu)化,轉(zhuǎn)換到翻譯成 streamGraph 再后面的過程就和 datastream api 開發(fā)的任務(wù)一樣了.
然后真正開始消費(fèi)數(shù)據(jù)的時(shí)候,會(huì)走到 JsonRowDataDeserializationSchema#deserialize 方法對(duì)數(shù)據(jù)進(jìn)行反序列化.
deserialize 源碼
@Override public?RowData?deserialize(@Nullable?byte[]?message)?throws?IOException?{if?(message?==?null)?{return?null;}try?{return?convertToRowData(deserializeToJsonNode(message));}?catch?(Throwable?t)?{if?(ignoreParseErrors)?{return?null;}throw?new?IOException(format("Failed?to?deserialize?JSON?'%s'.",?new?String(message)),?t);} }先會(huì)把數(shù)據(jù)反序列成 JsonNode 對(duì)象.
deserializeToJsonNode 源碼
public?JsonNode?deserializeToJsonNode(byte[]?message)?throws?IOException?{return?objectMapper.readTree(message); }可以看到 Flink 的內(nèi)部是用 jackson 解析數(shù)據(jù)的.接著把 jsonNode 格式的數(shù)據(jù)轉(zhuǎn)換成 RowData 格式的數(shù)據(jù)
convertToRowData 源碼
public?RowData?convertToRowData(JsonNode?message)?{return?(RowData)?runtimeConverter.convert(message); }然后這里的調(diào)用其實(shí)和上面構(gòu)造 JsonRowDataDeserializationSchema 的時(shí)候是一樣的
return?jsonNode?->?{ObjectNode?node?=?(ObjectNode)?jsonNode;int?arity?=?fieldNames.length;GenericRowData?row?=?new?GenericRowData(arity);for?(int?i?=?0;?i?<?arity;?i++)?{String?fieldName?=?fieldNames[i];JsonNode?field?=?node.get(fieldName);Object?convertedField?=?convertField(fieldConverters[i],?fieldName,?field);row.setField(i,?convertedField);}return?row; };最終返回的是 GenericRowData 類型的數(shù)據(jù),其實(shí)就是 RowData 類型的,因?yàn)槭?RowData 的實(shí)現(xiàn)類.然后就會(huì)把反序列后的數(shù)據(jù)發(fā)送到下游了.
總結(jié)
這篇文章主要分析了 Flink SQL JSON Format 的相關(guān)源碼,從構(gòu)建 JsonRowDataDeserializationSchema 到反序列化數(shù)據(jù) deserialize.因?yàn)槠?只展示每個(gè)環(huán)節(jié)最重要的代碼,其實(shí)很多細(xì)節(jié)都直接跳過了.感興趣的朋友也可以自己去調(diào)試一下代碼.有時(shí)間的話會(huì)更新更多的實(shí)現(xiàn)細(xì)節(jié).
推薦閱讀
Flink 任務(wù)實(shí)時(shí)監(jiān)控最佳實(shí)踐
Flink on yarn 實(shí)時(shí)日志收集最佳實(shí)踐
Flink 1.14.0 全新的 Kafka Connector
Flink 1.14.0 消費(fèi) kafka 數(shù)據(jù)自定義反序列化類
如果你覺得文章對(duì)你有幫助,麻煩點(diǎn)一下贊和在看吧,你的支持是我創(chuàng)作的最大動(dòng)力.
總結(jié)
以上是生活随笔為你收集整理的Flink SQL JSON Format 源码解析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于20年吉林电赛D题硬件电路的分享
- 下一篇: 处理JSON最强命令jq使用详解