日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

【数据湖Hudi-8-Hudi集成Flink-入门】

發布時間:2024/1/8 编程问答 36 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【数据湖Hudi-8-Hudi集成Flink-入门】 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

數據湖Hudi-8-Hudi集成Flink-入門

  • Hudi集成Flink入門
    • 1.Hudi集成Flink版本對照關系
    • 2.Flink環境準備
    • 3.Flink SQL Client方式處理任務
      • 1.修改配置
      • 2.創建表格,插入數據
      • 3.流式插入數據
    • 4.Flink IDEA編碼方式處理任務
      • 1.環境準備
      • 2.創建Maven工程,并編寫代碼
      • 3.提交運行
    • 5.Flink和Hudi類型映射關系

Hudi集成Flink入門

1.Hudi集成Flink版本對照關系


0.11.x不建議使用,如果要用請使用補丁分支:https://github.com/apache/hudi/pull/6182

2.Flink環境準備

1)拷貝編譯好的jar包到Flink的lib目錄下

cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle_2.12-0.12.0.jar /opt/module/flink-1.13.6/lib/

2)拷貝guava包,解決依賴沖突

cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/

3)配置Hadoop環境變量

sudo vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATH=`hadoop classpath` export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoopsource /etc/profile

3.Flink SQL Client方式處理任務

1.修改配置

  • 1)修改flink-conf.yaml配置
vim /opt/module/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4 # hudi寫出數據默認taskslots是4,如果不調整hudi,就在這里調整state.backend: rocksdb execution.checkpointing.interval: 30000 state.checkpoints.dir: hdfs://hadoop1:8020/ckps state.backend.incremental: true
  • 2)yarn-session模式
    (1)解決依賴問題
    注意:
    下面包依賴問題的處理,主要是解決 flink集成Hudi的時候,flink任務在執行的時候,需要進行 compaction,但是 compaction不會成功,且此錯誤不會上報到總日志服務器上,所以需要進入到Flink對應的單獨的任務里面,查看報錯,報錯信息如下,實際上在flink集成hudi里面有這個包,最終原因是以來沖突問題。
cp /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/

(2)啟動yarn-session

/opt/module/flink-1.13.6/bin/yarn-session.sh -d

(3)啟動sql-client

/opt/module/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session

2.創建表格,插入數據

set sql-client.execution.result-mode=tableau;

– 創建hudi表

CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20) ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' –- 默認是COW ); 或如下寫法 CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED ) PARTITIONED BY (`partition`) WITH ('connector' = 'hudi','path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t1','table.type' = 'MERGE_ON_READ' );
  • 插入數據
INSERT INTO t1 VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
  • 查詢數據
select * from t1;
  • 更新數據
insert into t1 values ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

注意,保存模式現在是Append。通常,除非是第一次創建表,否則請始終使用追加模式?,F在再次查詢數據將顯示更新的記錄。每個寫操作都會生成一個用時間戳表示的新提交。查找前一次提交中相同的_hoodie_record_keys在_hoodie_commit_time、age字段中的變化。

3.流式插入數據

  • 1)創建測試表
CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) WITH ('connector' = 'datagen','rows-per-second' = '1' );create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),`partition` varchar(20) ) with ('connector' = 'hudi','path' = '/tmp/hudi_flink/t2','table.type' = 'MERGE_ON_READ' );
  • 2)執行插入
insert into t2 select * from sourceT;
  • 3)查詢結果
set sql-client.execution.result-mode=tableau; select * from t2 limit 10;

4.Flink IDEA編碼方式處理任務

1.環境準備

  • 1.手動install依賴
    在hudi-flink1.13-bundle-0.12.0.jar所在目錄下,打開cmd,執行此命令,然后查看idea中settings的maven中 local repository多對應的本地依賴庫目錄跟執行完下面命令所對應的目錄是否一致,如果不一致,需要將下面命令編譯完的jar移動到剛剛目錄下面。
mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

2.創建Maven工程,并編寫代碼

代碼如下:

import org.apache.flink.configuration.Configuration; import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment;import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {//IDEA運行時,提供WEBUI // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//設置狀態后端 RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);//idea本地運行時,指定rocksdb存儲路徑 // embeddedRocksDBStateBackend.setDbStoragePath("file:///E:/rocksdb");embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);//checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/ckps");checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);tableEnvironment.executeSql("CREATE TABLE sourceT (\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +") WITH (\n" +" 'connector' = 'datagen',\n" +" 'rows-per-second' = '1'\n" +")");tableEnvironment.executeSql("create table t2(\n" +" uuid varchar(20),\n" +" name varchar(10),\n" +" age int,\n" +" ts timestamp(3),\n" +" `partition` varchar(20)\n" +")\n" +"with (\n" +" 'connector' = 'hudi',\n" +" 'path' = 'hdfs://hadoop102:8020/tmp/hudi_flink/t2',\n" +" 'table.type' = 'MERGE_ON_READ'\n" +")");tableEnvironment.executeSql("insert into t2 select * from sourceT");} }

3.提交運行

將代碼打成jar包,上傳到目錄myjars,執行提交命令:

flink run -t yarn-per-job \ -c com.yang.hudi.flink.HudiDemo \ ./myjars/flink-hudi-demo-1.0-SNAPSHOT.jar

5.Flink和Hudi類型映射關系

總結

以上是生活随笔為你收集整理的【数据湖Hudi-8-Hudi集成Flink-入门】的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。