Kafka->Flink->Hbase(纯DDL/DML形式)
概述
最近看到有位自稱阿里的工程師在gitbook收費4元[12]:
DDL形式實現kafka->Flink->Hbase
于是自己琢磨了下具體的流程,流程如下:
kafka的主題user_behavior中的內容,
通過Flink SQL Client,
傳遞給hbase的表venn
#########################################################################################################
開發環境
| 組件 | 版本 |
| Flink(HA) | 1.12 |
| Zookeeper | 3.6.0 |
| Hadoop | 3.1.2 |
| Hbase(HA) | 2.2.4 |
| Kafka(HA) | 2.5.0 |
這里解釋下Hbase的版本號為什么不能再高了(最多2.2.6),因為官方文檔[9]以及官方Repository[10]中Flink1.12對Hbase支持的最高版本是2.2.x
注意需要啟動上述表格中的所有集群,
勿忘關閉防火墻,或者開啟zookeeper/kafka集群需要的端口,否則有的尷尬的。
關閉防火牆命令:
service firewalld stop
hbase底層是hdfs,所以這里需要hadoop
###########################################################################################################
hbase常用操作
| 命令行 | 作用 |
| hbase shell | 進入命令行客戶端 |
| create 'venn','cf' | 新建表格venn,其中cf是列簇 |
| scan 'venn',{LIMIT=>1} | 查看新建的表格中的數據內容 |
##########################################################################################################
資料調研(代碼形式)
| 寫入/讀取方式 | 相關參考文獻 |
| 繼承RichSourceFunction(flink streaming) | [1][3][4] |
| 繼承RichSinkFunction重寫父類方法(flink streaming) | [4][5][8][16] |
| 重寫TableInputFormat方法(flink?streaming和flink dataSet) | [1][2][3][4] |
| 重寫TableOutputFormat(flink streaming和flink dataSet) | [1][2][3] |
本文我們先不關心代碼形式,
先設法處理DDL形式的kafka->Flink SQL Client->hbase
########################################可能用到的kafka命令##################################
?
可能用到的kafka命令
| 操作 | 命令 | 備注 |
| 查看topic | $KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181 | 無 ?
|
| 往user_behavior這個 topic發送 json 消息 | ? $KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior | 這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致 [2]中的報錯還可能是某個節點的kafka掛掉導致的. ? 可能碰到[3] 注意關閉防火墻 ? ? |
| 使用kafka自帶消費端測試下消費 | $KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic user_behavior | 如果kafka自帶消費者測試有問題,那么就不用繼續往下面做了, 此時如果使用Flink SQL Client來消費也必然會出現問題 |
| 清除topic中所有數據[6](因為,萬一你輸錯了呢?對吧) | $KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic user_behavior | 需要$KAFKA/config/server.properties設置 delete.topic.enable=true |
kafka消費端用自帶的上述命令行先確定能消費,先確保自己集群沒有問題再往下走。
######################################一些資料調研############################################
關于DDL的寫法調研
對于[14],群里大佬指出寫法太老了.
###############################################最終整個流程完整步驟和DDL/SQL代碼##################???????#######???????#######???????#######???????####
最終整個流程完整步驟和DDL/SQL代碼
本文最終決定以[19]為準,從user_behavior這個topic的名字來看,這個實驗其實是修改自Jark云邪的實驗[20]
①往kafka的user_behavior傳入數據
$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic user_behavior
數據示例如下(傳一條按下回車):
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
②終端輸入hbase shell
建表參考[21]
hbase(main):014:0> create 'venn','cf'
③啟動Flink SQL Client(注意附帶要啟動Flink集群,本實驗室standalone模式)
| Flink SQL Client中依次輸入 | 輸入后的實驗效果 |
| -- 讀 kafka ,source定義 (具體見下方gitee鏈接) | |
| --流入hbase,sink 定義 (具體見下方gitee鏈接) | ? |
| --提交任務,user_id當做row key,其他的當做列簇 (具體見下方gitee鏈接) |
上述 DDL可能在粘貼復制中被網頁污染導致無法運行,以下方鏈接為準:
https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數據源/Kafka_Flink_Hbase.txt
提交后,終端輸入hbase shell登錄hbase 客戶端,
這里解釋下為什么上面的DDL是2.2.3,而環境版本里面是2.2.4,這是因為這篇博文采用的是老版本的DDL寫法,會校對最高小版本號為2.2.3
如果采用新的DDL option寫法(本文沒有采用,DDL option寫法會無視小版本號校對,option寫法是本文附錄中flink開發者提及的.)
?
查看結果:
hbase(main):002:0> scan 'venn',{LIMIT=>1}
#######???????#######???????#######???????#######???????#######???????#######???????#############實驗結果#???????#######???????#######???????#######???????#######???????#######???????##############???????#######???????#######???????#######???????###
#######???????#######???????#######???????#######???????#######???????#######???????#######???????#附錄######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######
附錄-依賴問題
關于依賴包問題,我首先在[10]發現了一些hbase相關的jar
| jar包名字 | 是否更新到 Flink1.12版本 | 作用 |
| flink-hbase_2.12-1.10.2.jar | 否 | 用于gradoop |
| flink-connector-hbase-base_2.12-1.12.0.jar | 是 | 是一個hbase驅動基礎包 |
| flink-connector-hbase-2.2_2.12-1.12.0.jar??????? | 是 | 猜測是上面一個jar的完整版 |
| flink-sql-connector-hbase-2.2_2.12-1.12.0.jar | 是 | 猜測是為了支持Flink SQL Client的一個依賴包 |
先說結論,上面最后一個jar必須放入$FLINK_HOME/lib下面.
這里寫一個小筆記,怎么知道上面這些jar是干嘛的呢?
以flink-hbase_2.12-1.10.2.jar為例:
點開https://mvnrepository.com/search?q=flink-hbase中的上圖中的4 usages
會來到新的頁面,如下:
此時我們就知道這個包是被gradoop這個工具包調用的,和我們即將進行的實驗很可能關系不大
?
最后,我決定在$FLINK_HOME/lib下面放兩個依賴包:
flink-connector-hbase-2.2_2.12-1.12.0.jar
flink-sql-connector-hbase-2.2_2.12-1.12.0.jar
釘釘群里問開發者的時候時候,雲邪說上述兩個Jar只保留后者即可,下面是對話
其實我是看了Flink1.12的2.11的文檔,去嘗試2.12的實驗(所以并非我搞混2.11和2.12)
發現Flink1.12并不支持Hbase2.2.6和Hbase2.2.4,于是去官方群里提問,
他們解釋說由于我采用了老版本的DDL的寫法,導致flink對小版本號開始校驗了.
也就是說從1.11開始,DDL出了一種他們稱為option的寫法,
老版本的寫法會校對小版本號(本文采用了老版本的DDL寫法),
option寫法[9]不會校對小版本號(官方推薦)
文中使用的是老版本的寫法
從這里也可以看出,DDL的寫法發生了不小的變化,從一開始的支持 Schema以及上述gitee中的老版本寫法以及后續的option,至少有三個版本的DDL寫法,
???????在閱讀其他資料的時候也需要注意這點.
#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######???????#######
Reference:
[1]HBase讀寫的幾種方式(三)flink篇
[2]Flink操作HBase
[3]Flink讀寫系列之-讀HBase并寫入HBase
[4]flink與hbase交互
[5]Flink讀取數據存入Hbase
[6]HBase讀寫的幾種方式(三)flink篇
[7]HBase2實戰:HBase Flink和Kafka整合
[8]FlinkSQL讀取Hbase數據
[9]HBase SQL Connector
[10]https://repo1.maven.org/maven2/org/apache/flink/
[11]Flink SQL 化實時任務實戰:讀取Kafka,計算并寫入 HBase
[12]從 0 到 1:構建全 SQL 化 Flink 實時任務
[13]hbase創建表_Flink SQL實戰 — HBase的結合應用(余敖的,不完整)
[14]Flink SQL 將kafka數據寫入HBase(用的老式DDL寫法, 廣聯達的二把手說該寫法已經淘汰)
[15]FlinkSQL 數據去重,讀寫HBase,Kafka(這個要盡可能復現一下)
[16]flink實戰(一) flink-sql關聯hbase維度數據處理(用的RichSinkFunction)
[17]Flink SQL 實戰:HBase 的結合應用(余敖的,不完整)
[18]Flink sql 基于hbase,mysql的維表實戰 -未完(hbase和kafka做join 然后爛尾了)
[19]Flink 1.10 SQL 寫HBase???????(本文重點復現)
[20]Kafka2.5->Flink1.12->Mysql8(Jark實驗改為DDL形式)
[21]Flink 1.10 SQL 寫HBase
?
?
總結
以上是生活随笔為你收集整理的Kafka->Flink->Hbase(纯DDL/DML形式)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode 全解(python版)
- 下一篇: MySQL进程杀掉后是killed