日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Kafka->Flink->Hbase(纯DDL/DML形式)

發布時間:2023/12/31 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka->Flink->Hbase(纯DDL/DML形式) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

概述

最近看到有位自稱阿里的工程師在gitbook收費4元[12]:

DDL形式實現kafka->Flink->Hbase

于是自己琢磨了下具體的流程,流程如下:

kafka的主題user_behavior中的內容,

通過Flink SQL Client,

傳遞給hbase的表venn

#########################################################################################################

開發環境

組件版本
Flink(HA)1.12
Zookeeper3.6.0
Hadoop3.1.2
Hbase(HA)2.2.4
Kafka(HA)2.5.0

這里解釋下Hbase的版本號為什么不能再高了(最多2.2.6),因為官方文檔[9]以及官方Repository[10]中Flink1.12對Hbase支持的最高版本是2.2.x

注意需要啟動上述表格中的所有集群,

勿忘關閉防火墻,或者開啟zookeeper/kafka集群需要的端口,否則有的尷尬的。

關閉防火牆命令:

service firewalld stop

hbase底層是hdfs,所以這里需要hadoop

###########################################################################################################

hbase常用操作

命令行作用
hbase shell進入命令行客戶端
create 'venn','cf'新建表格venn,其中cf是列簇
scan 'venn',{LIMIT=>1}查看新建的表格中的數據內容

##########################################################################################################

資料調研(代碼形式)

寫入/讀取方式

相關參考文獻

繼承RichSourceFunction(flink streaming)

[1][3][4]
繼承RichSinkFunction重寫父類方法(flink streaming)[4][5][8][16]
重寫TableInputFormat方法(flink?streaming和flink dataSet)[1][2][3][4]
重寫TableOutputFormat(flink streaming和flink dataSet)[1][2][3]

本文我們先不關心代碼形式,

先設法處理DDL形式的kafka->Flink SQL Client->hbase

########################################可能用到的kafka命令##################################

?

可能用到的kafka命令

操作命令備注
查看topic$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

?


?

user_behavior這個 topic發送 json 消息

?

$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior

這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致

[2]中的報錯還可能是某個節點的kafka掛掉導致的.

?

可能碰到[3]

注意關閉防火墻

?

?

使用kafka自帶消費端測試下消費$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic user_behavior

如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了,

此時如果使用Flink SQL Client來消費也必然會出現問題

清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧)$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic user_behavior

需要$KAFKA/config/server.properties設置

delete.topic.enable=true

kafka消費端用自帶的上述命令行先確定能消費,先確保自己集群沒有問題再往下走。

######################################一些資料調研############################################

關于DDL的寫法調研

對于[14],群里大佬指出寫法太老了.

###############################################最終整個流程完整步驟和DDL/SQL代碼##################???????#######???????#######???????#######???????####

最終整個流程完整步驟和DDL/SQL代碼

本文最終決定以[19]為準,從user_behavior這個topic的名字來看,這個實驗其實是修改自Jark云邪的實驗[20]

①往kafka的user_behavior傳入數據

$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior

數據示例如下(傳一條按下回車):

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

②終端輸入hbase shell

建表參考[21]
hbase(main):014:0> create 'venn','cf'

③啟動Flink SQL Client(注意附帶要啟動Flink集群,本實驗室standalone模式)

Flink SQL Client中依次輸入輸入后的實驗效果

-- 讀 kafka ,source定義

(具體見下方gitee鏈接)

--流入hbase,sink 定義
(具體見下方gitee鏈接)
?
--提交任務,user_id當做row key,其他的當做列簇
(具體見下方gitee鏈接)

上述 DDL可能在粘貼復制中被網頁污染導致無法運行,以下方鏈接為準:

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/Kafka_Flink_Hbase.txt

提交后,終端輸入hbase shell登錄hbase 客戶端,

這里解釋下為什么上面的DDL是2.2.3,而環境版本里面是2.2.4,這是因為這篇博文采用的是老版本的DDL寫法,會校對最高小版本號為2.2.3

如果采用新的DDL option寫法(本文沒有采用,DDL option寫法會無視小版本號校對,option寫法是本文附錄中flink開發者提及的.)

?

查看結果:

hbase(main):002:0> scan 'venn',{LIMIT=>1}

#######???????#######???????#######???????#######???????#######???????#######???????#############實驗結果#???????#######???????#######???????#######???????#######???????#######???????##############???????#######???????#######???????#######???????###

#######???????#######???????#######???????#######???????#######???????#######???????#######???????#附錄######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######

附錄-依賴問題

關于依賴包問題,我首先在[10]發現了一些hbase相關的jar

jar包名字

是否更新到

Flink1.12版本

作用
flink-hbase_2.12-1.10.2.jar用于gradoop
flink-connector-hbase-base_2.12-1.12.0.jar是一個hbase驅動基礎包

flink-connector-hbase-2.2_2.12-1.12.0.jar???????

猜測是上面一個jar的完整版
flink-sql-connector-hbase-2.2_2.12-1.12.0.jar猜測是為了支持Flink SQL Client的一個依賴包

先說結論,上面最后一個jar必須放入$FLINK_HOME/lib下面.

這里寫一個小筆記,怎么知道上面這些jar是干嘛的呢?

以flink-hbase_2.12-1.10.2.jar為例:

點開https://mvnrepository.com/search?q=flink-hbase中的上圖中的4 usages

會來到新的頁面,如下:

此時我們就知道這個包是被gradoop這個工具包調用的,和我們即將進行的實驗很可能關系不大

?

最后,我決定在$FLINK_HOME/lib下面放兩個依賴包:

flink-connector-hbase-2.2_2.12-1.12.0.jar

flink-sql-connector-hbase-2.2_2.12-1.12.0.jar

釘釘群里問開發者的時候時候,雲邪說上述兩個Jar只保留后者即可,下面是對話

其實我是看了Flink1.12的2.11的文檔,去嘗試2.12的實驗(所以并非我搞混2.11和2.12)

發現Flink1.12并不支持Hbase2.2.6和Hbase2.2.4,于是去官方群里提問,

他們解釋說由于我采用了老版本的DDL的寫法,導致flink對小版本號開始校驗了.

也就是說從1.11開始,DDL出了一種他們稱為option的寫法,

老版本的寫法會校對小版本號(本文采用了老版本的DDL寫法),

option寫法[9]不會校對小版本號(官方推薦)

文中使用的是老版本的寫法

從這里也可以看出,DDL的寫法發生了不小的變化,從一開始的支持 Schema以及上述gitee中的老版本寫法以及后續的option,至少有三個版本的DDL寫法,

???????在閱讀其他資料的時候也需要注意這點.

#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######

Reference:

[1]HBase讀寫的幾種方式(三)flink篇

[2]Flink操作HBase

[3]Flink讀寫系列之-讀HBase并寫入HBase

[4]flink與hbase交互

[5]Flink讀取數據存入Hbase

[6]HBase讀寫的幾種方式(三)flink篇

[7]HBase2實戰:HBase Flink和Kafka整合

[8]FlinkSQL讀取Hbase數據

[9]HBase SQL Connector

[10]https://repo1.maven.org/maven2/org/apache/flink/

[11]Flink SQL 化實時任務實戰:讀取Kafka,計算并寫入 HBase

[12]從 0 到 1:構建全 SQL 化 Flink 實時任務

[13]hbase創建表_Flink SQL實戰 — HBase的結合應用(余敖的,不完整)

[14]Flink SQL 將kafka數據寫入HBase(用的老式DDL寫法, 廣聯達的二把手說該寫法已經淘汰)

[15]FlinkSQL 數據去重,讀寫HBase,Kafka(這個要盡可能復現一下)

[16]flink實戰(一) flink-sql關聯hbase維度數據處理(用的RichSinkFunction)

[17]Flink SQL 實戰:HBase 的結合應用(余敖的,不完整)

[18]Flink sql 基于hbase,mysql的維表實戰 -未完(hbase和kafka做join 然后爛尾了)

[19]Flink 1.10 SQL 寫HBase???????(本文重點復現)

[20]Kafka2.5->Flink1.12->Mysql8(Jark實驗改為DDL形式)

[21]Flink 1.10 SQL 寫HBase

?

?

總結

以上是生活随笔為你收集整理的Kafka->Flink->Hbase(纯DDL/DML形式)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。