实时数仓(二):DWD层-数据处理
目錄實時數倉(二):DWD層-數據處理1.數據源2.用戶行為日志2.1開發環境搭建1)包結構2)pom.xml3)MykafkaUtil.java4)log4j.properties2.2 實現功能1)代碼實現2)部署運行3.業務數據3.1 實現功能3.2 動態分流1)建配置表:create.sql2)配置類:TableProcess.java3)MysqlUtil.java4)常量類:GmallConfig.java5)主程序:BaseDBApp.java6)自定義分流函數:TableProcessFunction.java7)HbaseSink:DimSink.java8)自定義序列化 kafka sink3.4 主程序:流程總結分析3.5 思考4.整體流程圖分析
實時數倉(二):DWD層-數據處理
1.數據源
dwd的數據來自Kafka的ods層原始數據:業務數據(ods_base_db) 、日志數據(ods_base_log)
從Kafka的ODS層讀取用戶行為日志以及業務數據,并進行簡單處理,寫回到Kafka作為DWD層。
2.用戶行為日志
2.1開發環境搭建
1)包結構
2)pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>03_gmall2021</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<flink.version>1.12.0</flink.version>
<scala.version>2.12</scala.version>
<hadoop.version>3.1.3</hadoop.version>
</properties>
<dependencies>
<!-- flink Web UI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<!--如果保存檢查點到hdfs上,需要引入此依賴-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--Flink默認使用的是slf4j記錄日志,相當于一個日志的接口,我們這里使用log4j作為具體的日志實現-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<!--lomback插件依賴-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--commons-beanutils是Apache開源組織提供的用于操作JAVA BEAN的工具包。
使用commons-beanutils,我們可以很方便的對bean對象的屬性進行操作-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<!--Guava工程包含了若干被Google的Java項目廣泛依賴的核心庫,方便開發-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.4</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.janeluo</groupId>
<artifactId>ikanalyzer</artifactId>
<version>2012_u6</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
3)MykafkaUtil.java
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class MyKafkaUtil {
private static Properties properties = new Properties();
private static String DEFAULT_TOPIC = "dwd_default_topic";
static {
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
}
/**
* todo kafka sink:自定義序列化,各種類型自定義傳輸
*
* @return
*/
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties props = new Properties();
//kafka地址
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
//生產數據超時時間
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
/**
* 獲取生產者對象 ,只能傳輸String類型
*
* @param topic 主題
*/
public static FlinkKafkaProducer<String> getFlinkKafkaProducer(String topic) {
return new FlinkKafkaProducer<String>(topic,
new SimpleStringSchema(),
properties);
}
/**
* todo 構建消費者 -> 優化
*
* @param bootstrapServers:kafka地址
* @param topic :topic可以用逗號分隔
* @param groupId:消費者組
* @param isSecurity:是否kafka設置sasl
* @param offsetStrategy:消費策略:3種
* @return
*/
public static FlinkKafkaConsumer<String> getKafkaConsumer(String bootstrapServers, String topic, String groupId, String isSecurity, String offsetStrategy) {
SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
Properties props = new Properties();
props.setProperty("bootstrap.servers", bootstrapServers);
props.setProperty("group.id", groupId);
props.setProperty("flink.partition-discovery.interval-millis", "60000");
//kafka開啟sasl認證
if ("true".equalsIgnoreCase(isSecurity)) {
props.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";");
props.setProperty("security.protocol", "SASL_PLAINTEXT");
props.setProperty("sasl.mechanism", "PLAIN");
}
//消費多個topic
String[] split = topic.split(",");
List<String> topics = Arrays.asList(split);
//kafka消費者
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topics, simpleStringSchema, props);
//消費方式:earliest,latest,setStartFromTimestamp
switch (offsetStrategy) {
case "earliest":
consumer.setStartFromEarliest();
return consumer;
case "latest":
consumer.setStartFromLatest();
return consumer;
default:
consumer.setStartFromTimestamp(System.currentTimeMillis() - Integer.valueOf(offsetStrategy) * 60 * 1000);
return consumer;
}
}
/**
* 獲取消費者對象
*
* @param topic 主題
* @param groupId 消費者組
*/
public static FlinkKafkaConsumer<String> getFlinkKafkaConsumer(String topic, String groupId) {
//添加消費組屬性
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return new FlinkKafkaConsumer<String>(topic,
new SimpleStringSchema(),
properties);
}
//拼接Kafka相關屬性到DDL
public static String getKafkaDDL(String topic, String groupId) {
return "'connector' = 'kafka', " +
" 'topic' = '" + topic + "'," +
" 'properties.bootstrap.servers' = 'hadoop102:9092', " +
" 'properties.group.id' = '" + groupId + "', " +
" 'format' = 'json', " +
" 'scan.startup.mode' = 'latest-offset'";
}
}
4)log4j.properties
log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
2.2 實現功能
我們前面采集的日志數據已經保存到Kafka中,作為日志數據的ODS層,從Kafka的ODS層讀取的日志數據分為3類, 頁面日志、啟動日志和曝光日志。這三類數據雖然都是用戶行為數據,但是有著完全不一樣的數據結構,所以要拆分處理。將拆分后的不同的日志寫回Kafka不同主題中,作為日志DWD層。
頁面日志輸出到主流,啟動日志輸出到啟動側輸出流,曝光日志輸出到曝光側輸出流
1)從kafka讀取ods數據
2)判斷新老用戶
3)分流
4)寫回到kafka的dwd層
1)代碼實現
package com.flink.realtime.app.dwd;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @description: todo->準備用戶行為日志dwd層
* @author: HaoWu
* @create: 2021年06月22日
*/
public class BaseLogApp {
public static void main(String[] args) throws Exception {
// TODO 1.獲取執行環境
// 1.1 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 并行度設置為Kafka的分區數
env.setParallelism(4);
/*
// 1.3 設置checkpoint
env.enableCheckpointing(5000L); //每5000ms做一次ck
env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時時間:1min
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默認:exactly_once
//正常Cancel任務時,保留最后一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//狀態后端:
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_log_app"));
// 訪問hdfs訪問權限問題
// 報錯異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
// 解決:/根目錄沒有寫權限 解決方案1.hadoop fs -chown 777 / 2.System.setProperty("HADOOP_USER_NAME", "atguigu");
System.setProperty("HADOOP_USER_NAME", "atguigu");
*/
//TODO 2.獲取kafka ods_base_log 主題數據
String sourceTopic = "ods_base_log";
String groupId = "base_log_app_group";
//FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getFlinkKafkaConsumer(sourceTopic, groupId);
FlinkKafkaConsumer<String> consumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", sourceTopic, groupId, "false", "earliest");
DataStreamSource<String> kafkaDS = env.addSource(consumer);
//TODO 3.將每行數據轉換為JSONObject
// 處理臟數據
OutputTag<String> dirty = new OutputTag<String>("DirtyData") {};
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.process(new ProcessFunction<String, JSONObject>() {
@Override
public void processElement(String value, Context context, Collector<JSONObject> collector) throws Exception {
try {
JSONObject jsonObject = JSON.parseObject(value);
//轉JSON對象
collector.collect(jsonObject);
} catch (Exception e) {
//JSON解析異常輸出臟數據
context.output(dirty, value);
}
}
});
//jsonObjDS.print("json>>>>>>>>");
//TODO 4.按照設備ID分組、使用狀態編程做新老用戶校驗
//4.1 根據mid對日志進行分組
SingleOutputStreamOperator<JSONObject> jsonObjWithNewFlag = jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
.process(new KeyedProcessFunction<String, JSONObject, JSONObject>() {
//聲明狀態用于表示當前Mid是否已經訪問過
private ValueState<String> firstVisitDateState;
private SimpleDateFormat simpleDateFormat;
//初始化狀態
@Override
public void open(Configuration parameters) throws Exception {
firstVisitDateState = getRuntimeContext().getState(new ValueStateDescriptor<String>("new-mid", String.class));
simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
}
@Override
public void processElement(JSONObject value, Context ctx, Collector<JSONObject> out) throws Exception {
//取出新用戶標記 :is_new:1->新用戶 ,0->老用戶
String isNew = value.getJSONObject("common").getString("is_new");
//如果當前前端傳輸數據表示為新用戶,則進行校驗
if ("1".equals(isNew)) {
//取出狀態數據并取出當前訪問時間
String firstDate = firstVisitDateState.value();
Long ts = value.getLong("ts");
//判斷狀態數據是否為Null
if (firstDate != null) {
//修復
value.getJSONObject("common").put("is_new", "0");
} else {
//更新狀態
firstVisitDateState.update(simpleDateFormat.format(ts));
}
}
out.collect(value);
}
});
//測試打印
//jsonObjWithNewFlag.print();
//TODO 5.使用側輸出流將 啟動、曝光、頁面數據分流
OutputTag<String> startOutPutTag = new OutputTag<String>("start"){}; //啟動
OutputTag<String> displayOutputTag = new OutputTag<String>("display"){}; //曝光
SingleOutputStreamOperator<String> pageDS = jsonObjWithNewFlag.process(new ProcessFunction<JSONObject, String>() {
@Override
public void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {
//判斷是否為啟動數據
String start = value.getString("start");
if (start != null && start.length() > 0) {
//啟動數據
ctx.output(startOutPutTag, value.toJSONString());
} else {
//不是啟動數據一定是頁面數據
out.collect(value.toJSONString());
//抽取公共字段、頁面信息、時間戳
JSONObject common = value.getJSONObject("common");
JSONObject page = value.getJSONObject("page");
Long ts = value.getLong("ts");
//獲取曝光數據
JSONArray displayArr = value.getJSONArray("displays");
if (displayArr != null && displayArr.size() > 0) {
JSONObject displayObj = new JSONObject();
displayObj.put("common", common);
displayObj.put("page", page);
displayObj.put("ts", ts);
//遍歷曝光信息
for (Object display : displayArr) {
displayObj.put("display", display);
//輸出曝光數據到側輸出流
ctx.output(displayOutputTag, displayObj.toJSONString());
}
}
}
}
});
//TODO 6.將三個流的數據分別寫入Kafka
//打印
jsonObjDS.getSideOutput(dirty).print("Dirty>>>>>>>>>>>");
//主流:頁面
pageDS.print("Page>>>>>>>>>>>");
//側流:啟動
pageDS.getSideOutput(startOutPutTag).print("Start>>>>>>>>>>>>");
//側流:曝光
pageDS.getSideOutput(displayOutputTag).print("Display>>>>>>>>>>>>>");
//輸出到kafka
String pageSinkTopic = "dwd_page_log";
String startSinkTopic = "dwd_start_log";
String displaySinkTopic = "dwd_display_log";
pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(pageSinkTopic));
pageDS.getSideOutput(startOutPutTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(startSinkTopic));
pageDS.getSideOutput(displayOutputTag).addSink(MyKafkaUtil.getFlinkKafkaProducer(displaySinkTopic));
env.execute();
}
}
2)部署運行
BaseLogApp.sh
#!/bin/bash
source ~/.bashrc
cd $(dirname $0)
day=$(date +%Y%m%d%H%M)
#flink
job_name=02_dwd_BaseLogApp
clazz=com.flink.realtime.app.dwd.BaseLogApp
jar_path=/opt/module/gmall-flink/03_gmall2021-1.0-SNAPSHOT-jar-with-dependencies.jar
#-----------------------run----------------------------------------------
#yarn模式:per-job
/opt/module/flink-1.12.0/bin/flink run
-t yarn-per-job
-Dyarn.application.name=${job_name}
-Dyarn.application.queue=default
-Djobmanager.memory.process.size=1024m
-Dtaskmanager.memory.process.size=1024m
-Dtaskmanager.numberOfTaskSlots=2
-c ${clazz} ${jar_path}
3.業務數據
3.1 實現功能
業務數據的變化,我們可以通過FlinkCDC采集到,但是FlinkCDC是把全部數據統一寫入一個Topic中, 這些數據包括事實數據,也包含維度數據,這樣顯然不利于日后的數據處理,所以這個功能是從Kafka的業務數據ODS層讀取數據,經過處理后,將維度數據保存到HBase,將事實數據寫回Kafka作為業務數據的DWD層。
3.2 動態分流
由于FlinkCDC是把全部數據統一寫入一個Topic中, 這樣顯然不利于日后的數據處理。所以需要把各個表拆開處理。但是由于每個表有不同的特點,有些表是維度表,有些表是事實表。
在實時計算中一般把維度數據寫入存儲容器,一般是方便通過主鍵查詢的數據庫比如HBase,Redis,MySQL等。一般把事實數據寫入流中,進行進一步處理,最終形成寬表。
這樣的配置不適合寫在配置文件中,因為這樣的話,業務端隨著需求變化每增加一張表,就要修改配置重啟計算程序。所以這里需要一種動態配置方案,把這種配置長期保存起來,一旦配置有變化,實時計算可以自動感知。
這種可以有兩個方案實現
一種是用Zookeeper存儲,通過Watch感知數據變化。
另一種是用mysql數據庫存儲,周期性的同步。
這里選擇第二種方案,主要是MySQL對于配置數據初始化和維護管理,使用FlinkCDC讀取配置信息表,將配置流作為廣播流與主流進行連接。
1)建配置表:create.sql
--配置表
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '來源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作類型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '輸出類型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '輸出表(主題)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '輸出字段',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主鍵字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表擴展',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
品牌維表
CREATE TABLE `base_trademark` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '編號',
`tm_name` varchar(100) NOT NULL COMMENT '屬性值',
`logo_url` varchar(200) DEFAULT NULL COMMENT '品牌logo的圖片路徑',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC COMMENT='品牌表'
--品牌表:base_trademark表 insert 操作,插入hbase的dim_base_trademark只要 `id`,`name` 字段, `id`作為主鍵
insert into table_process values ('base_trademark','insert','hbase','dim_base_trademark','id,name','id','');
mysql> select * from table_process;
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| source_table | operate_type | sink_type | sink_table | sink_columns | sink_pk | sink_extend |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
| base_trademark | insert | hbase | dim_base_trademark | id,name | id | |
+----------------+--------------+-----------+--------------------+--------------+---------+-------------+
1 row in set (0.00 sec)
2)配置類:TableProcess.java
import lombok.Data;
/**
* @description: TODO 配置表實體類
* @author: HaoWu
* @create: 2021年06月25日
*/
@Data
public class TableProcess {
//動態分流Sink常量
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//來源表
String sourceTable;
//操作類型 insert,update,delete
String operateType;
//輸出類型 hbase kafka
String sinkType;
//輸出表(主題)
String sinkTable;
//輸出字段
String sinkColumns;
//主鍵字段
String sinkPk;
//建表擴展
String sinkExtend;
}
3)MysqlUtil.java
import com.flink.realtime.bean.TableProcess;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.CaseFormat;
import java.lang.reflect.InvocationTargetException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* @description: TODO Mysql工具類
* @author: HaoWu
* @create: 2021年07月23日
* 完成ORM,對象關系映射
* O:Object對象 Java中對象
* R:Relation關系 關系型數據庫
* M:Mapping映射 將Java中的對象和關系型數據庫的表中的記錄建立起映射關系
*/
public class MysqlUtil {
/**
* @param sql 執行sql語句
* @param clazz 封裝bean類型
* @param underScoreToCamel 是否列名轉駝峰命名
* @param <T>
* @return
*/
public static <T> List<T> queryList(String sql, Class<T> clazz, Boolean underScoreToCamel) {
Connection con = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
// 注冊驅動
Class.forName("com.mysql.jdbc.Driver");
// 獲取連接
con = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/gmall-realtime?characterEncoding=utf-8&useSSL=false", "root", "root");
// 獲取數據庫操作對象
ps = con.prepareStatement(sql);
// 執行sql
rs = ps.executeQuery();
// 處理結果集,封裝list對象
ResultSetMetaData metaData = rs.getMetaData(); //獲取結果集元數據
ArrayList<T> resultList = new ArrayList<>();
while (rs.next()) {
// 將單條記錄封裝對象
T obj = clazz.newInstance();
// 遍歷所有列,轉駝峰,對象屬性賦值
for (int i = 1; i < metaData.getColumnCount(); i++) {
String columnName = metaData.getColumnName(i);
String propertyName = "";
if (underScoreToCamel) {
// 通過guava工具類,將表中的列轉換為類屬性的駝峰命名
propertyName = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, columnName);
}
// 給屬性賦值
BeanUtils.setProperty(obj,propertyName,rs.getObject(i));
}
resultList.add(obj);
}
return resultList;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("從Mysql查詢數據失敗");
} finally {
// 釋放資源
if (rs != null) {
try {
rs.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if (ps != null) {
try {
ps.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
if (con != null) {
try {
con.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
public static void main(String[] args) throws InvocationTargetException, IllegalAccessException {
String sql="select * from table_process";
List<TableProcess> list = MysqlUtil.queryList(sql, TableProcess.class, true);
System.out.println(list);
TableProcess tableProcess = new TableProcess();
BeanUtils.setProperty(tableProcess,"sourceTable","redis");
System.out.println(tableProcess);
}
}
4)常量類:GmallConfig.java
package com.flink.realtime.common;
/**
* @description: TODO 常量配置類
* @author: HaoWu
* @create: 2021年06月25日
*/
public class GmallConfig {
//Phoenix庫名
public static final String HBASE_SCHEMA = "bigdata";
//Phoenix驅動
public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
//Phoenix連接參數
public static final String PHOENIX_SERVER = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";
public static final String CLICKHOUSE_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
public static final String CLICKHOUSE_URL = "jdbc:clickhouse://hadoop102:8123/default";
}
5)主程序:BaseDBApp.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.app.func.DimSink;
import com.flink.realtime.app.func.TableProcessFunction;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.utils.MyKafkaUtil;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.OutputTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
/**
* @description: todo->準備業務數據dwd層
* @author: HaoWu
* @create: 2021年06月22日
*/
public class BaseDbApp {
public static void main(String[] args) throws Exception {
// TODO 1.創建執行環境
// 1.1 創建stream執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 1.2 設置并行度
env.setParallelism(1);
/*
// 1.3 設置checkpoint參數
env.enableCheckpointing(5000L); //每5000ms做一次ck
env.getCheckpointConfig().setCheckpointTimeout(60000L); // ck超時時間:1min
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //ck模式,默認:exactly_once
//正常Cancel任務時,保留最后一次CK
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重啟策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000L));
//狀態后端:
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall/checkpoint/base_db_app"));
// 訪問hdfs訪問權限問題
// 報錯異常:Permission denied: user=haowu, access=WRITE, inode="/":atguigu:supergroup:drwxr-xr-x
// 解決:/根目錄沒有寫權限 解決方案1.hadoop fs -chown 777 / 2.System.setProperty("HADOOP_USER_NAME", "atguigu");
System.setProperty("HADOOP_USER_NAME", "atguigu");
*/
// TODO 2.獲取kafka的ods層業務數據:ods_basic_db
String ods_db_topic = "ods_base_db";
FlinkKafkaConsumer<String> kafkaConsumer = MyKafkaUtil.getKafkaConsumer("hadoop102:9092", ods_db_topic, "ods_base_db_consumer1", "false", "latest");
DataStreamSource<String> jsonStrDS = env.addSource(kafkaConsumer);
//jsonStrDS.print();
// TODO 3.對jsonStrDS結構轉換
SingleOutputStreamOperator<JSONObject> jsonDS = jsonStrDS.map(jsonStr -> JSON.parseObject(jsonStr));
// TODO 4.對數據ETL
SingleOutputStreamOperator<JSONObject> filterDS = jsonDS.filter(
json -> {
boolean flag = json.getString("table") != null //表名不為null
&& json.getString("data") != null //數據不為null
&& json.getString("data").length() >= 3; //數據長度大于3
return flag;
}
);
//filterDS.print("filterDS>>>>>>>>>>");
// TODO 5. 動態分流:事實表放-主流 -> kafka dwd層 ,維度表-側輸出流 -> hbase
// 5.1 定義輸出到Hbase的側輸出流標簽
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>(TableProcess.SINK_TYPE_HBASE) {
};
// 5.2 主流輸出到kafka
SingleOutputStreamOperator<JSONObject> kafkaDS = filterDS.process(new TableProcessFunction(hbaseTag));
// 5.3 獲取側輸出流到hbase
DataStream<JSONObject> hbaseDS = kafkaDS.getSideOutput(hbaseTag);
kafkaDS.print("實時:kafkaDS>>>>>>>>");
hbaseDS.print("維度:hbaseDS>>>>>>>>");
// TODO 6.維度數據保存到Hbase中
hbaseDS.addSink(new DimSink());
// TODO 7.實時數據保存到Kafka中,自定義序列化
FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("kafka序列化");
}
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
String sink_topic = jsonObject.getString("sink_table");
JSONObject data = jsonObject.getJSONObject("data");
return new ProducerRecord<>(sink_topic, data.toString().getBytes());
}
});
kafkaDS.addSink(kafkaSink);
// TODO 8.執行
env.execute();
}
}
6)自定義分流函數:TableProcessFunction.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.bean.TableProcess;
import com.flink.realtime.common.GmallConfig;
import com.flink.realtime.utils.MysqlUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.List;
/**
* @description: TODO 業務數據分流自定義Process函數
* @author: HaoWu
* @create: 2021年07月26日
*/
public class TableProcessFunction extends ProcessFunction<JSONObject, JSONObject> {
//維表側輸出標簽
private OutputTag<JSONObject> outputTag;
//內存中存儲表配置對象{表名,表配置信息}
private Map<String, TableProcess> tableProcessMap = new HashMap<>();
//內存中判斷是否已經存在Hbase表
private Set<String> existsTables = new HashSet<>();
//定義Phoenix連接
private Connection connection;
public TableProcessFunction() {
}
public TableProcessFunction(OutputTag<JSONObject> outputTag) {
this.outputTag = outputTag;
}
@Override
public void open(Configuration parameters) throws Exception {
//初始化phoenix連接
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
//初始化配置表信息
initTableProcessMap();
//配置表的信息可能會發生表更,需要開啟定時任務從現在起5000ms后,每隔5000ms更新一次
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
initTableProcessMap();
}
}, 5000, 5000);
}
@Override
public void processElement(JSONObject jsonObj, Context ctx, Collector<JSONObject> out) throws Exception {
//表名
String tableName = jsonObj.getString("table");
//操作類型
String type = jsonObj.getString("type");
//注意:問題修復 如果使用maxwell同步歷史數據,他的操作類型是bootstrap-insert
if ("bootstrap-insert".equals(type)) {
type = "insert";
jsonObj.put("type", type);
}
if (tableProcessMap != null && tableProcessMap.size() > 0) {
//根據key取出配置信息
String key = tableName + ":" + type;
TableProcess tableProcess = tableProcessMap.get(key);
//判斷是否獲取到配置對象
if (tableProcess != null) {
//獲取sinkTable,指明數據發往何處。 維度數據->hbase , 事實數據->kafka ,給這條數據打上一個標記。
jsonObj.put("sink_table", tableProcess.getSinkTable());
//指定了sinkcolumn,對需要保留的字段進行過濾
String sinkColumns = tableProcess.getSinkColumns();
if (sinkColumns != null && sinkColumns.length() > 0) {
filterColumn(jsonObj.getJSONObject("data"), sinkColumns);
}
} else {
System.out.println("No this Key <<<< " + key + ">>>> in MySQL");
}
//根據sinkType輸出到不同的流
if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_HBASE)) {
//sinkType=hbase 輸出到側輸出流
ctx.output(outputTag, jsonObj);
} else if (tableProcess != null && tableProcess.getSinkType().equals(TableProcess.SINK_TYPE_KAFKA)) {
//sinkType=kafka 輸出到主流
out.collect(jsonObj);
}
}
}
/**
* 從mysql查詢配置信息,保存到map內存中
*/
private void initTableProcessMap() {
System.out.println("查詢配置表信息");
//1.從mysql中查詢配置信息
List<TableProcess> tableProcesses = MysqlUtil.queryList("select * from table_process", TableProcess.class, true);
for (TableProcess tableProcess : tableProcesses) {
String sourceTable = tableProcess.getSourceTable(); //源表
String operateType = tableProcess.getOperateType(); //操作類型
String sinkType = tableProcess.getSinkType(); //目標表類型
String sinkTable = tableProcess.getSinkTable(); //目標表名
String sinkPk = tableProcess.getSinkPk(); //目標表主鍵
String sinkColumns = tableProcess.getSinkColumns(); //目標表字段
String sinkExtend = tableProcess.getSinkExtend(); //擴展字段
//2.將配置信息封裝成map集合
tableProcessMap.put(sourceTable + ":" + operateType, tableProcess);
//3.檢查是表是否內存中存在
//如果向Hbase保存的表,那么判斷內存中set是否存在過。
if ("insert".equals(operateType) && "hbase".equals(sinkType)) {
boolean isExist = existsTables.add(sinkTable);
//4.如果內存中不存在表數據信息,則創建新Hbase表
if (isExist) {
checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
}
}
}
}
/**
* 通過Phoenix創建Hbase表
*
* @param tableName 表名
* @param columns 列屬性
* @param pk 主鍵
* @param extend 擴展字段
*/
private void checkTable(String tableName, String columns, String pk, String extend) {
//主鍵不存在給默認值
if (pk == null) {
pk = "id";
}
//擴展字段給默認值
if (extend == null) {
extend = "";
}
//拼接sql建表語句
StringBuilder createSql = new StringBuilder("create table if not exists " + GmallConfig.HBASE_SCHEMA + "." + tableName + "(");
//拼接列屬性
String[] fieldArr = columns.split(",");
for (int i = 0; i < fieldArr.length; i++) {
String field = fieldArr[i];
//判斷是否為主鍵
if (field.equals(pk)) {
createSql.append(field).append(" varchar primary key ");
} else {
createSql.append("info.").append(field).append(" varchar");
}
//非最后一個字段需要添加逗號
if (i < fieldArr.length - 1) {
createSql.append(",");
}
}
createSql.append(")");
createSql.append(extend);
System.out.println("建表sql:" + createSql);
//通過Phoenix創建hbase表
PreparedStatement ps = null;
try {
ps = connection.prepareStatement(createSql.toString());
ps.execute();
} catch (SQLException throwables) {
throwables.printStackTrace();
throw new RuntimeException("建表失?。。。。?!" + tableName);
} finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}
/**
* 篩選配置表中保留的字段
*
* @param data 每行數據記錄
* @param sinkColumns 配置表保留字段
*/
private void filterColumn(JSONObject data, String sinkColumns) {
//需要保留的字段
String[] columns = sinkColumns.split(",");
//數組轉集合,判斷集合中是否包含某個元素
List<String> columnList = Arrays.asList(columns);
//獲取json中封裝的鍵值對,每個鍵值對封裝為一個Entry類型
Set<Map.Entry<String, Object>> entrySet = data.entrySet();
/*for (Map.Entry<String, Object> entry : entrySet) {
if (!columnList.contains(entry.getKey())) {
entrySet.remove();
} 遍歷集合刪除元素使用迭代器,for循環刪除會報錯
}*/
Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
if (!columnList.contains(entry.getKey())) {
iterator.remove();
}
}
}
}
7)HbaseSink:DimSink.java
import com.alibaba.fastjson.JSONObject;
import com.flink.realtime.common.GmallConfig;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Set;
/**
* @description: TODO Hbase sink 通過Phoenix向Hbase表中寫數據
* @author: HaoWu
* @create: 2021年07月30日
*/
public class DimSink extends RichSinkFunction<JSONObject> {
//定義Phoenix連接
Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
}
/**
* 生成語句提交hbase
*
* @param jsonObj
* @param context
* @throws Exception
*/
@Override
public void invoke(JSONObject jsonObj, Context context) {
String sinkTableName = jsonObj.getString("sink_table");
JSONObject dataObj = jsonObj.getJSONObject("data");
if (dataObj != null && dataObj.size() > 0) {
String upsertSql = genUpdateSql(sinkTableName.toUpperCase(), jsonObj.getJSONObject("data"));
System.out.println(upsertSql);
try {
PreparedStatement ps = connection.prepareStatement(upsertSql);
ps.executeUpdate();
connection.commit();
} catch (SQLException throwables) {
throwables.printStackTrace();
throw new RuntimeException("執行upsert語句失?。。?!");
}
}
}
/**
* 生成upsert語句
*
* @param sinkTableName
* @param data
* @return
*/
private String genUpdateSql(String sinkTableName, JSONObject data) {
Set<String> fields = data.keySet();
String upsertSql = "upsert into " + GmallConfig.HBASE_SCHEMA + "." + sinkTableName + " (" + StringUtils.join(fields, ",") + ")";
String valuesSql = " values ('" + StringUtils.join(data.values(), "','") + "')";
return upsertSql + valuesSql;
}
}
8)自定義序列化 kafka sink
/**
* todo kafka sink 自定義序列化,各種類型自定義傳輸
*
* @return
*/
public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
Properties props = new Properties();
//kafka地址
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092,hadoop104:9092");
//生產數據超時時間
props.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 15 * 60 * 1000 + "");
return new FlinkKafkaProducer<T>(DEFAULT_TOPIC, kafkaSerializationSchema, props, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
// TODO 7.實時數據保存到Kafka中,自定義序列化
FlinkKafkaProducer<JSONObject> kafkaSink = MyKafkaUtil.getKafkaSinkBySchema(new KafkaSerializationSchema<JSONObject>() {
@Override
public void open(SerializationSchema.InitializationContext context) throws Exception {
System.out.println("kafka序列化");
}
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long aLong) {
String sink_topic = jsonObject.getString("sink_table");
JSONObject data = jsonObject.getJSONObject("data");
return new ProducerRecord<>(sink_topic, data.toString().getBytes());
}
});
3.4 主程序:流程總結分析
TableProcessFunction是一個自定義算子,主要包括三條時間線任務
圖中紫線,這個時間線與數據流入無關,只要任務啟動就會執行。主要的任務方法是open()這個方法在任務啟動時就會執行。他的主要工作就是初始化一些連接,開啟周期調度。
圖中綠線,這個時間線也與數據流入無關,只要周期調度啟動,會自動周期性執行。主要的任務是同步配置表(tableProcessMap)。通過在open()方法中加入timer實現。同時還有個附帶任務就是如果發現不存在數據表,要根據配置自動創建數據庫表。
圖中黑線,這個時間線就是隨著數據的流入持續發生,這部分的任務就是根據同步到內存的tableProcessMap,來為流入的數據進行標識,同時清理掉沒用的字段。
3.5 思考
1.目前的配置表只能識別新增的配置項,不支持修改原有的配置項 ?
4.整體流程圖分析
總結
以上是生活随笔為你收集整理的实时数仓(二):DWD层-数据处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 中国抗疫物资援外清单(国际抗疫援助)
- 下一篇: 怎么创建具有真实纹理的CG场景岩石?