日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 综合教程 >内容正文

综合教程

实时数仓(二):DWD层-数据处理

發布時間:2024/6/21 综合教程 27 生活家
生活随笔 收集整理的這篇文章主要介紹了 实时数仓(二):DWD层-数据处理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

目錄實時數倉(二):DWD層-數據處理1.數據源2.用戶行為日志2.1開發環境搭建1)包結構2)pom.xml3)MykafkaUtil.java4)log4j.properties2.2 實現功能1)代碼實現2)部署運行3.業務數據3.1 實現功能3.2 動態分流1)建配置表:create.sql2)配置類:TableProcess.java3)MysqlUtil.java4)常量類:GmallConfig.java5)主程序:BaseDBApp.java6)自定義分流函數:TableProcessFunction.java7)HbaseSink:DimSink.java8)自定義序列化 kafka sink3.4 主程序:流程總結分析3.5 思考4.整體流程圖分析

實時數倉(二):DWD層-數據處理

1.數據源

dwd的數據來自Kafka的ods層原始數據:業務數據(ods_base_db) 、日志數據(ods_base_log)

從Kafka的ODS層讀取用戶行為日志以及業務數據,并進行簡單處理,寫回到Kafka作為DWD層。

2.用戶行為日志

2.1開發環境搭建

1)包結構

2)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>03_gmall2021</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <flink.version>1.12.0</flink.version>
        <scala.version>2.12</scala.version>
        <hadoop.version>3.1.3</hadoop.version>
    </properties>

    <dependencies>
        <!--  flink Web UI -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <!--如果保存檢查點到hdfs上,需要引入此依賴-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--Flink默認使用的是slf4j記錄日志,相當于一個日志的接口,我們這里使用log4j作為具體的日志實現-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.2.0</version>
        </dependency>

        <!--lomback插件依賴-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>5.0.0-HBase-2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <!--commons-beanutils是Apache開源組織提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,我們可以很方便的對bean對象的屬性進行操作-->
        <dependency>
            <groupId>commons-beanutils</groupId>
            <artifactId>commons-beanutils</artifactId>
            <version>1.9.3</version>
        </dependency>

        <!--Guava工程包含了若干被Google的Java項目廣泛依賴的核心庫,方便開發-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>29.0-jre</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>

        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
            <exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-databind</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.core</groupId>
                    <artifactId>jackson-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
            <version>2012_u6</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>
3)MykafkaUtil.java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class MyKafkaUtil {

    private static Properties properties = new Properties();

    private static String DEFAULT_TOPIC = "dwd_default_topic";

    static {
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    }

    /**
     * todo kafka sink:自定義序列化,各種類型自定義傳輸
     *
     * @return
     */
    public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
        Properties props = new Properties();
        //kafka地址
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //生產數據超時時間
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }

    /**
     * 獲取生產者對象 ,只能傳輸String類型
     *
     * @param topic 主題
     */
    public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic) {
        return new FlinkKafkaProducer<String>(topic,
                new SimpleStringSchema(),
                properties);
    }


    /**
     * todo 構建消費者 -> 優化
     *
     * @param bootstrapServers:kafka地址
     * @param topic                    :topic可以用逗號分隔
     * @param groupId:消費者組
     * @param isSecurity:是否kafka設置sasl
     * @param offsetStrategy:消費策略:3種
     * @return
     */
    public static FlinkKafkaConsumer<String> getKafkaConsumer(String bootstrapServers, String topic, String groupId, String isSecurity, String offsetStrategy) {
        SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServers);
        props.setProperty("group.id", groupId);
        props.setProperty("flink.partition-discovery.interval-millis", "60000");
        //kafka開啟sasl認證
        if ("true".equalsIgnoreCase(isSecurity)) {
            props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";");
            props.setProperty("security.protocol", "SASL_PLAINTEXT");
            props.setProperty("sasl.mechanism", "PLAIN");
        }
        //消費多個topic
        String[] split = topic.split(",");
        List<String> topics = Arrays.asList(split);
        //kafka消費者
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topics, simpleStringSchema, props);

        //消費方式:earliest,latest,setStartFromTimestamp
        switch (offsetStrategy) {
            case "earliest":
                consumer.setStartFromEarliest();
                return consumer;
            case "latest":
                consumer.setStartFromLatest();
                return consumer;
            default:
                consumer.setStartFromTimestamp(System.currentTimeMillis() - Integer.valueOf(offsetStrategy) * 60 * 1000);
                return consumer;
        }
    }

    /**
     * 獲取消費者對象
     *
     * @param topic   主題
     * @param groupId 消費者組
     */
    public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {

        //添加消費組屬性
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        return new FlinkKafkaConsumer<String>(topic,
                new SimpleStringSchema(),
                properties);
    }

    //拼接Kafka相關屬性到DDL
    public static String getKafkaDDL(String topic, String groupId) {
        return "'connector' = 'kafka', " +
                " 'topic' = '" + topic + "'," +
                " 'properties.bootstrap.servers' = 'hadoop102:9092', " +
                " 'properties.group.id' = '" + groupId + "', " +
                "  'format' = 'json', " +
                "  'scan.startup.mode' = 'latest-offset'";
    }
}
4)log4j.properties
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

2.2 實現功能

我們前面采集的日志數據已經保存到Kafka中,作為日志數據的ODS層,從Kafka的ODS層讀取的日志數據分為3類, 頁面日志、啟動日志和曝光日志。這三類數據雖然都是用戶行為數據,但是有著完全不一樣的數據結構,所以要拆分處理。將拆分后的不同的日志寫回Kafka不同主題中,作為日志DWD層。

頁面日志輸出到主流,啟動日志輸出到啟動側輸出流,曝光日志輸出到曝光側輸出流

1)從kafka讀取ods數據
2)判斷新老用戶
3)分流
4)寫回到kafka的dwd層

1)代碼實現
package com.flink.realtime.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @description: todo->準備用戶行為日志dwd層
 * @author: HaoWu
 * @create: 2021年06月22日
 */
public class BaseLogApp {
    public static void main(String[] args) throws Exception {
        // TODO 1.獲取執行環境
        // 1.1 創建執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.2 并行度設置為Kafka的分區數
        env.setParallelism(4);

        /*
        // 1.3 設置checkpoint
        env.enableCheckpointing(5000L); //每5000ms做一次ck
        env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時時間:1min
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默認:exactly_once
        //正常Cancel任務時,保留最后一次CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //狀態后端:
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_log_app"));
        // 訪問hdfs訪問權限問題
        // 報錯異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
        // 解決:/根目錄沒有寫權限 解決方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        */

        //TODO 2.獲取kafka ods_base_log 主題數據
        String sourceTopic = "ods_base_log";
        String groupId = "base_log_app_group";
        //FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getFlinkKafkaConsumer(sourceTopic, groupId);
        FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", sourceTopic, groupId, "false", "earliest");
        DataStreamSource<String> kafkaDS = env.addSource(consumer);


       //TODO 3.將每行數據轉換為JSONObject
        // 處理臟數據
        OutputTag<String> dirty = new OutputTag<String>("DirtyData") {};
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
            @Override
            public void processElement(String value, Context context, Collector<JSONObject> collector) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    //轉JSON對象
                    collector.collect(jsonObject);
                } catch (Exception e) {
                    //JSON解析異常輸出臟數據
                    context.output(dirty, value);
                }
            }
        });

        //jsonObjDS.print("json>>>>>>>>");


        //TODO 4.按照設備ID分組、使用狀態編程做新老用戶校驗
        //4.1 根據mid對日志進行分組
        SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
                .process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
                    //聲明狀態用于表示當前Mid是否已經訪問過
                    private ValueState<String> firstVisitDateState;
                    private SimpleDateFormat simpleDateFormat;

                    //初始化狀態
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("new-mid", String.class));
                        simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                    }

                    @Override
                    public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
                        //取出新用戶標記 :is_new:1->新用戶 ,0->老用戶
                        String isNew = value.getJSONObject("common").getString("is_new");
                        //如果當前前端傳輸數據表示為新用戶,則進行校驗
                        if ("1".equals(isNew)) {
                            //取出狀態數據并取出當前訪問時間
                            String firstDate = firstVisitDateState.value();
                            Long ts = value.getLong("ts");
                            //判斷狀態數據是否為Null
                            if (firstDate != null) {
                                //修復
                                value.getJSONObject("common").put("is_new", "0");
                            } else {
                                //更新狀態
                                firstVisitDateState.update(simpleDateFormat.format(ts));
                            }
                        }
                        out.collect(value);
                    }

                });
        //測試打印
        //jsonObjWithNewFlag.print();

        //TODO 5.使用側輸出流將 啟動、曝光、頁面數據分流
        OutputTag<String> startOutPutTag = new OutputTag<String>("start"){}; //啟動
        OutputTag<String> displayOutputTag = new OutputTag<String>("display"){}; //曝光
        SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlag.process(new ProcessFunction<JSONObject, String>() {
            @Override
            public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
                //判斷是否為啟動數據
                String start = value.getString("start");
                if (start != null && start.length() > 0) {
                    //啟動數據
                    ctx.output(startOutPutTag, value.toJSONString());
                } else {
                    //不是啟動數據一定是頁面數據
                    out.collect(value.toJSONString());

                    //抽取公共字段、頁面信息、時間戳
                    JSONObject common = value.getJSONObject("common");
                    JSONObject page = value.getJSONObject("page");
                    Long ts = value.getLong("ts");

                    //獲取曝光數據
                    JSONArray displayArr = value.getJSONArray("displays");

                    if (displayArr != null && displayArr.size() > 0) {
                        JSONObject displayObj = new JSONObject();
                        displayObj.put("common", common);
                        displayObj.put("page", page);
                        displayObj.put("ts", ts);
                        //遍歷曝光信息
                        for (Object display : displayArr) {
                            displayObj.put("display", display);
                            //輸出曝光數據到側輸出流
                            ctx.output(displayOutputTag, displayObj.toJSONString());
                        }

                    }

                }
            }
        });

        //TODO 6.將三個流的數據分別寫入Kafka
        //打印
        jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>>");
        //主流:頁面
        pageDS.print("Page>>>>>>>>>>>");
        //側流:啟動
        pageDS.getSideOutput(startOutPutTag).print("Start>>>>>>>>>>>>");
        //側流:曝光
        pageDS.getSideOutput(displayOutputTag).print("Display>>>>>>>>>>>>>");

        //輸出到kafka
        String pageSinkTopic = "dwd_page_log";
        String startSinkTopic = "dwd_start_log";
        String displaySinkTopic = "dwd_display_log";
        pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(pageSinkTopic));
        pageDS.getSideOutput(startOutPutTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(startSinkTopic));
        pageDS.getSideOutput(displayOutputTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(displaySinkTopic));

        env.execute();
    }
}
2)部署運行

BaseLogApp.sh

#!/bin/bash

source  ~/.bashrc

cd $(dirname $0)
day=$(date +%Y%m%d%H%M)

#flink
job_name=02_dwd_BaseLogApp
clazz=com.flink.realtime.app.dwd.BaseLogApp
jar_path=/opt/module/gmall-flink/03_gmall2021-1.0-SNAPSHOT-jar-with-dependencies.jar

#-----------------------run----------------------------------------------
#yarn模式:per-job
/opt/module/flink-1.12.0/bin/flink run 
-t yarn-per-job 
-Dyarn.application.name=${job_name} 
-Dyarn.application.queue=default 
-Djobmanager.memory.process.size=1024m 
-Dtaskmanager.memory.process.size=1024m 
-Dtaskmanager.numberOfTaskSlots=2 
-c ${clazz} ${jar_path}

3.業務數據

3.1 實現功能

業務數據的變化,我們可以通過FlinkCDC采集到,但是FlinkCDC是把全部數據統一寫入一個Topic中, 這些數據包括事實數據,也包含維度數據,這樣顯然不利于日后的數據處理,所以這個功能是從Kafka的業務數據ODS層讀取數據,經過處理后,將維度數據保存到HBase,將事實數據寫回Kafka作為業務數據的DWD層。

3.2 動態分流

由于FlinkCDC是把全部數據統一寫入一個Topic中, 這樣顯然不利于日后的數據處理。所以需要把各個表拆開處理。但是由于每個表有不同的特點,有些表是維度表,有些表是事實表。

在實時計算中一般把維度數據寫入存儲容器,一般是方便通過主鍵查詢的數據庫比如HBase,Redis,MySQL等。一般把事實數據寫入流中,進行進一步處理,最終形成寬表。

這樣的配置不適合寫在配置文件中,因為這樣的話,業務端隨著需求變化每增加一張表,就要修改配置重啟計算程序。所以這里需要一種動態配置方案,把這種配置長期保存起來,一旦配置有變化,實時計算可以自動感知。

這種可以有兩個方案實現

一種是用Zookeeper存儲,通過Watch感知數據變化。

另一種是用mysql數據庫存儲,周期性的同步。

這里選擇第二種方案,主要是MySQL對于配置數據初始化和維護管理,使用FlinkCDC讀取配置信息表,將配置流作為廣播流與主流進行連接。

1)建配置表:create.sql
--配置表
CREATE TABLE `table_process` (
  `source_table` varchar(200) NOT NULL COMMENT '來源表',
  `operate_type` varchar(200) NOT NULL COMMENT '操作類型 insert,update,delete',
  `sink_type` varchar(200) DEFAULT NULL COMMENT '輸出類型 hbase kafka',
  `sink_table` varchar(200) DEFAULT NULL COMMENT '輸出表(主題)',
  `sink_columns` varchar(2000) DEFAULT NULL COMMENT '輸出字段',
  `sink_pk` varchar(200) DEFAULT NULL COMMENT '主鍵字段',
  `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表擴展',
  PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

品牌維表

 CREATE TABLE `base_trademark` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
  `tm_name` varchar(100) NOT NULL COMMENT '屬性值',
  `logo_url` varchar(200) DEFAULT NULL COMMENT '品牌logo的圖片路徑',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='品牌表' 
--品牌表:base_trademark表 insert 操作,插入hbase的dim_base_trademark只要 `id`,`name` 字段, `id`作為主鍵
insert into table_process values ('base_trademark','insert','hbase','dim_base_trademark','id,name','id','');
mysql> select * from table_process;
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| source_table   | operate_type | sink_type | sink_table         | sink_columns | sink_pk | sink_extend |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| base_trademark | insert       | hbase     | dim_base_trademark | id,name      | id      |             |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
1 row in set (0.00 sec)
2)配置類:TableProcess.java
import lombok.Data;

/**
 * @description: TODO 配置表實體類
 * @author: HaoWu
 * @create: 2021年06月25日
 */

@Data
public class TableProcess {
    //動態分流Sink常量
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //來源表
    String sourceTable;
    //操作類型 insert,update,delete
    String operateType;
    //輸出類型 hbase kafka
    String sinkType;
    //輸出表(主題)
    String sinkTable;
    //輸出字段
    String sinkColumns;
    //主鍵字段
    String sinkPk;
    //建表擴展
    String sinkExtend;
}

3)MysqlUtil.java
import com.flink.realtime.bean.TableProcess;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.CaseFormat;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

/**
 * @description: TODO Mysql工具類
 * @author: HaoWu
 * @create: 2021年07月23日
 * 完成ORM,對象關系映射
 * O:Object對象       Java中對象
 * R:Relation關系     關系型數據庫
 * M:Mapping映射      將Java中的對象和關系型數據庫的表中的記錄建立起映射關系
 */
public class MysqlUtil {

    /**
     * @param sql               執行sql語句
     * @param clazz             封裝bean類型
     * @param underScoreToCamel 是否列名轉駝峰命名
     * @param <T>
     * @return
     */
    public static <T> List<T> queryList(String sql, Class<T> clazz, Boolean underScoreToCamel) {
        Connection con = null;
        PreparedStatement ps = null;
        ResultSet rs = null;
        try {
            // 注冊驅動
            Class.forName("com.mysql.jdbc.Driver");
            // 獲取連接
            con = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall-realtime?characterEncoding=utf-8&useSSL=false", "root", "root");
            // 獲取數據庫操作對象
            ps = con.prepareStatement(sql);
            // 執行sql
            rs = ps.executeQuery();
            // 處理結果集,封裝list對象
            ResultSetMetaData metaData = rs.getMetaData(); //獲取結果集元數據
            ArrayList<T> resultList = new ArrayList<>();
            while (rs.next()) {
                // 將單條記錄封裝對象
                T obj = clazz.newInstance();
                // 遍歷所有列,轉駝峰,對象屬性賦值
                for (int i = 1; i < metaData.getColumnCount(); i++) {
                    String columnName = metaData.getColumnName(i);
                    String propertyName = "";
                    if (underScoreToCamel) {
                        // 通過guava工具類,將表中的列轉換為類屬性的駝峰命名
                        propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName);
                    }
                    // 給屬性賦值
                    BeanUtils.setProperty(obj,propertyName,rs.getObject(i));
                }
                resultList.add(obj);
            }
            return resultList;
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("從Mysql查詢數據失敗");
        } finally {
            // 釋放資源
            if (rs != null) {
                try {
                    rs.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
            if (ps != null) {
                try {
                    ps.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
            if (con != null) {
                try {
                    con.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
        }
    }


    public static void main(String[] args) throws InvocationTargetException, IllegalAccessException {
        String sql="select * from table_process";
        List<TableProcess> list = MysqlUtil.queryList(sql, TableProcess.class, true);
        System.out.println(list);
        TableProcess tableProcess = new TableProcess();
        BeanUtils.setProperty(tableProcess,"sourceTable","redis");
        System.out.println(tableProcess);
    }
}
4)常量類:GmallConfig.java
package com.flink.realtime.common;

/**
 * @description: TODO 常量配置類
 * @author: HaoWu
 * @create: 2021年06月25日
 */
public class GmallConfig {
    //Phoenix庫名
    public static final String HBASE_SCHEMA = "bigdata";

    //Phoenix驅動
    public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";

    //Phoenix連接參數
    public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";

    public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";

    public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop102:8123/default";
}
5)主程序:BaseDBApp.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.app.func.DimSink;
import com.flink.realtime.app.func.TableProcessFunction;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;

/**
 * @description: todo->準備業務數據dwd層
 * @author: HaoWu
 * @create: 2021年06月22日
 */
public class BaseDbApp {
    public static void main(String[] args) throws Exception {
        // TODO 1.創建執行環境
        // 1.1 創建stream執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 1.2 設置并行度
        env.setParallelism(1);
        /*
        // 1.3 設置checkpoint參數
        env.enableCheckpointing(5000L); //每5000ms做一次ck
        env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時時間:1min
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默認:exactly_once
        //正常Cancel任務時,保留最后一次CK
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //重啟策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
        //狀態后端:
        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app"));
        // 訪問hdfs訪問權限問題
        // 報錯異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
        // 解決:/根目錄沒有寫權限 解決方案1.hadoop fs -chown 777 /   2.System.setProperty("HADOOP_USER_NAME", "atguigu");
        System.setProperty("HADOOP_USER_NAME", "atguigu");
        */

        // TODO 2.獲取kafka的ods層業務數據:ods_basic_db
        String ods_db_topic = "ods_base_db";
        FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer1", "false", "latest");
        DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
        //jsonStrDS.print();
        // TODO 3.對jsonStrDS結構轉換
        SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));

        // TODO 4.對數據ETL
        SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
                json -> {
                    boolean flag = json.getString("table") != null //表名不為null
                            && json.getString("data") != null  //數據不為null
                            && json.getString("data").length() >= 3; //數據長度大于3
                    return flag;
                }
        );
        //filterDS.print("filterDS>>>>>>>>>>");
        // TODO 5. 動態分流:事實表放-主流 -> kafka dwd層 ,維度表-側輸出流 -> hbase
        // 5.1 定義輸出到Hbase的側輸出流標簽
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE) {
        };
        // 5.2 主流輸出到kafka
        SingleOutputStreamOperator<JSONObject> kafkaDS = filterDS.process(new TableProcessFunction(hbaseTag));
        // 5.3 獲取側輸出流到hbase
        DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);

        kafkaDS.print("實時:kafkaDS>>>>>>>>");
        hbaseDS.print("維度:hbaseDS>>>>>>>>");

        // TODO 6.維度數據保存到Hbase中
        hbaseDS.addSink(new DimSink());
        // TODO 7.實時數據保存到Kafka中,自定義序列化
        FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public void open(SerializationSchema.InitializationContext context) throws Exception {
                System.out.println("kafka序列化");
            }

            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
                String sink_topic = jsonObject.getString("sink_table");
                JSONObject data = jsonObject.getJSONObject("data");
                return new ProducerRecord<>(sink_topic, data.toString().getBytes());
            }
        });
        kafkaDS.addSink(kafkaSink);
        // TODO 8.執行
        env.execute();
    }
}
6)自定義分流函數:TableProcessFunction.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.common.GmallConfig;
import com.flink.realtime.utils.MysqlUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.List;

/**
 * @description: TODO 業務數據分流自定義Process函數
 * @author: HaoWu
 * @create: 2021年07月26日
 */
public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {

    //維表側輸出標簽
    private OutputTag<JSONObject> outputTag;

    //內存中存儲表配置對象{表名,表配置信息}
    private Map<String, TableProcess> tableProcessMap = new HashMap<>();

    //內存中判斷是否已經存在Hbase表
    private Set<String> existsTables = new HashSet<>();

    //定義Phoenix連接
    private Connection connection;

    public TableProcessFunction() {
    }

    public TableProcessFunction(OutputTag<JSONObject> outputTag) {
        this.outputTag = outputTag;
    }


    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化phoenix連接
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);

        //初始化配置表信息
        initTableProcessMap();
        //配置表的信息可能會發生表更,需要開啟定時任務從現在起5000ms后,每隔5000ms更新一次
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                initTableProcessMap();
            }
        }, 5000, 5000);
    }


    @Override
    public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
        //表名
        String tableName = jsonObj.getString("table");
        //操作類型
        String type = jsonObj.getString("type");
        //注意:問題修復 如果使用maxwell同步歷史數據,他的操作類型是bootstrap-insert
        if ("bootstrap-insert".equals(type)) {
            type = "insert";
            jsonObj.put("type", type);
        }

        if (tableProcessMap != null && tableProcessMap.size() > 0) {
            //根據key取出配置信息
            String key = tableName + ":" + type;
            TableProcess tableProcess = tableProcessMap.get(key);
            //判斷是否獲取到配置對象
            if (tableProcess != null) {
                //獲取sinkTable,指明數據發往何處。 維度數據->hbase , 事實數據->kafka ,給這條數據打上一個標記。
                jsonObj.put("sink_table", tableProcess.getSinkTable());
                //指定了sinkcolumn,對需要保留的字段進行過濾
                String sinkColumns = tableProcess.getSinkColumns();
                if (sinkColumns != null && sinkColumns.length() > 0) {
                    filterColumn(jsonObj.getJSONObject("data"), sinkColumns);
                }
            } else {
                System.out.println("No this Key <<<< " + key + ">>>> in MySQL");
            }

            //根據sinkType輸出到不同的流
            if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
                //sinkType=hbase 輸出到側輸出流
                ctx.output(outputTag, jsonObj);
            } else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
                //sinkType=kafka 輸出到主流
                out.collect(jsonObj);
            }

        }
    }


    /**
     * 從mysql查詢配置信息,保存到map內存中
     */
    private void initTableProcessMap() {
        System.out.println("查詢配置表信息");
        //1.從mysql中查詢配置信息
        List<TableProcess> tableProcesses = MysqlUtil.queryList("select * from table_process", TableProcess.class, true);
        for (TableProcess tableProcess : tableProcesses) {
            String sourceTable = tableProcess.getSourceTable(); //源表
            String operateType = tableProcess.getOperateType(); //操作類型
            String sinkType = tableProcess.getSinkType(); //目標表類型
            String sinkTable = tableProcess.getSinkTable(); //目標表名
            String sinkPk = tableProcess.getSinkPk(); //目標表主鍵
            String sinkColumns = tableProcess.getSinkColumns(); //目標表字段
            String sinkExtend = tableProcess.getSinkExtend(); //擴展字段
            //2.將配置信息封裝成map集合
            tableProcessMap.put(sourceTable + ":" + operateType, tableProcess);
            //3.檢查是表是否內存中存在
            //如果向Hbase保存的表,那么判斷內存中set是否存在過。
            if ("insert".equals(operateType) && "hbase".equals(sinkType)) {
                boolean isExist = existsTables.add(sinkTable);
                //4.如果內存中不存在表數據信息,則創建新Hbase表
                if (isExist) {
                    checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
                }
            }
        }
    }

    /**
     * 通過Phoenix創建Hbase表
     *
     * @param tableName 表名
     * @param columns   列屬性
     * @param pk        主鍵
     * @param extend    擴展字段
     */
    private void checkTable(String tableName, String columns, String pk, String extend) {
        //主鍵不存在給默認值
        if (pk == null) {
            pk = "id";
        }
        //擴展字段給默認值
        if (extend == null) {
            extend = "";
        }
        //拼接sql建表語句
        StringBuilder createSql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + tableName + "(");
        //拼接列屬性
        String[] fieldArr = columns.split(",");
        for (int i = 0; i < fieldArr.length; i++) {
            String field = fieldArr[i];
            //判斷是否為主鍵
            if (field.equals(pk)) {
                createSql.append(field).append(" varchar primary key ");
            } else {
                createSql.append("info.").append(field).append(" varchar");
            }
            //非最后一個字段需要添加逗號
            if (i < fieldArr.length - 1) {
                createSql.append(",");
            }
        }
        createSql.append(")");
        createSql.append(extend);
        System.out.println("建表sql:" + createSql);
        //通過Phoenix創建hbase表
        PreparedStatement ps = null;
        try {
            ps = connection.prepareStatement(createSql.toString());
            ps.execute();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            throw new RuntimeException("建表失?。。。。?!" + tableName);
        } finally {
            if (ps != null) {
                try {
                    ps.close();
                } catch (SQLException throwables) {
                    throwables.printStackTrace();
                }
            }
        }
    }

    /**
     * 篩選配置表中保留的字段
     *
     * @param data        每行數據記錄
     * @param sinkColumns 配置表保留字段
     */
    private void filterColumn(JSONObject data, String sinkColumns) {
        //需要保留的字段
        String[] columns = sinkColumns.split(",");
        //數組轉集合,判斷集合中是否包含某個元素
        List<String> columnList = Arrays.asList(columns);
        //獲取json中封裝的鍵值對,每個鍵值對封裝為一個Entry類型
        Set<Map.Entry<String, Object>> entrySet = data.entrySet();
        /*for (Map.Entry<String, Object> entry : entrySet) {
            if (!columnList.contains(entry.getKey())) {
                entrySet.remove();
            }  遍歷集合刪除元素使用迭代器,for循環刪除會報錯
        }*/
        Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, Object> entry = iterator.next();
            if (!columnList.contains(entry.getKey())) {
                iterator.remove();
            }
        }
    }
}
7)HbaseSink:DimSink.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.common.GmallConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;

/**
 * @description: TODO Hbase sink 通過Phoenix向Hbase表中寫數據
 * @author: HaoWu
 * @create: 2021年07月30日
 */
public class DimSink extends RichSinkFunction<JSONObject> {

    //定義Phoenix連接
    Connection connection = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);

    }

    /**
     * 生成語句提交hbase
     *
     * @param jsonObj
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(JSONObject jsonObj, Context context) {
        String sinkTableName = jsonObj.getString("sink_table");
        JSONObject dataObj = jsonObj.getJSONObject("data");
        if (dataObj != null && dataObj.size() > 0) {
            String upsertSql = genUpdateSql(sinkTableName.toUpperCase(), jsonObj.getJSONObject("data"));
            System.out.println(upsertSql);
            try {
                PreparedStatement ps = connection.prepareStatement(upsertSql);
                ps.executeUpdate();
                connection.commit();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
                throw new RuntimeException("執行upsert語句失?。。?!");
            }
        }
    }

    /**
     * 生成upsert語句
     *
     * @param sinkTableName
     * @param data
     * @return
     */
    private String genUpdateSql(String sinkTableName, JSONObject data) {
        Set<String> fields = data.keySet();
        String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTableName + " (" + StringUtils.join(fields, ",") + ")";
        String valuesSql = " values ('" + StringUtils.join(data.values(), "','") + "')";
        return upsertSql + valuesSql;
    }
}
8)自定義序列化 kafka sink
    /**
     * todo kafka sink 自定義序列化,各種類型自定義傳輸
     *
     * @return
     */
    public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
        Properties props = new Properties();
        //kafka地址
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
        //生產數據超時時間
        props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
    }


        // TODO 7.實時數據保存到Kafka中,自定義序列化
        FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
            @Override
            public void open(SerializationSchema.InitializationContext context) throws Exception {
                System.out.println("kafka序列化");
            }

            @Override
            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
                String sink_topic = jsonObject.getString("sink_table");
                JSONObject data = jsonObject.getJSONObject("data");
                return new ProducerRecord<>(sink_topic, data.toString().getBytes());
            }
        });

3.4 主程序:流程總結分析

TableProcessFunction是一個自定義算子,主要包括三條時間線任務

圖中紫線,這個時間線與數據流入無關,只要任務啟動就會執行。主要的任務方法是open()這個方法在任務啟動時就會執行。他的主要工作就是初始化一些連接,開啟周期調度。

圖中綠線,這個時間線也與數據流入無關,只要周期調度啟動,會自動周期性執行。主要的任務是同步配置表(tableProcessMap)。通過在open()方法中加入timer實現。同時還有個附帶任務就是如果發現不存在數據表,要根據配置自動創建數據庫表。

圖中黑線,這個時間線就是隨著數據的流入持續發生,這部分的任務就是根據同步到內存的tableProcessMap,來為流入的數據進行標識,同時清理掉沒用的字段。

3.5 思考

1.目前的配置表只能識別新增的配置項,不支持修改原有的配置項 ?

4.整體流程圖分析

總結

以上是生活随笔為你收集整理的实时数仓(二):DWD层-数据处理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。