日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式)

發(fā)布時間:2023/12/31 数据库 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

##############################################實驗目的和環(huán)境###############################################

本文是為了在最新的版本上復現(xiàn)[1],環(huán)境如下:

組件版本
Zookeeper3.6.0
Flink1.12
Mysql8.0.22-0ubuntu0.20.04.2
Kafka2.5.0

[1]的作者Jark 采用了Java代碼的形式,這篇博客對其流程進行了等效簡化,

采用Flink SQL Client上純DDL+SQL 的形式,全篇無一行Java代碼.

?

注意關(guān)閉防火墻

service firewalld stop

啟動zookeeper,hadoop(這個應該沒用,但是平時習慣了,也啟動吧),flink,mysql,kafka

138210 StandaloneSessionClusterEntrypoint
24593 ResourceManager
115362 QuorumPeerMain
24006 DataNode
124727 Kafka
138491 TaskManagerRunner
138603 Jps
23817 NameNode
24315 SecondaryNameNode
79162 SqlClient
24798 NodeManager

############################################################################################################

文件作用解析:

Jark給的文件

文件的作用

需要修改的地方

kafka-common.sh

kafka生產(chǎn)端輸入數(shù)據(jù)

?

brokers/ids/1

localhost改成自己的節(jié)點域名

run.sh用來在 Web UI 中看到拓撲
source-generator.sh

創(chuàng)建topic

往kafka里面填充數(shù)據(jù)

--broker-list后面改掉

topic改成自己需要的

pom.xml依賴文件版本號根據(jù)自己需要修改即可
kafka-consumer.shkafka消費端?
env.sh環(huán)境變量設(shè)置FLINK_DIR
KAFKA_DIR
src/main/resources/user_behavior.log數(shù)據(jù)來源
src/main/resources/q1.sql

source table定義

sink table定義

connector.properties.0.value

connector.properties.1.value

connector.url

connector.username

connector.password

?

*.java?

操作步驟:

操作命令作用

?

一些準備工作

mysql> create database `flink-test`;(因為這里有橫杠,所以需要使用``包起來)

datagrip建表語句

create table pvuv_sink
(
?? ?dt char(30) not null,
?? ?pv bigint null,
?? ?uv bigint null,
?? ?constraint pvuv_sink_pk
?? ??? ?primary key (dt)
);

注意,dt不要使用varchar,否則會導致無法設(shè)定為primary key

在mysql中建立sink表格
intellij運行SqlSubmit

生成flink-sql-submit.jar

被下面的source-generator.sh調(diào)用

./source-generator.sh創(chuàng)建kafka的topic,然后往里面填充數(shù)據(jù)
./run.sh q1?提交成功后,可以在 Web UI 中看到拓撲

該實驗的kafka主題是user_behavior

############################################################################################################

kafka常用操作如下:

操作命令備注
查看topic$KAFKA/bin/kafka-topics.sh --list --zookeeper Desktop:2181

如果想刪除topic,可以是:

?


?

user_behavior這個 topic發(fā)送 json 消息$KAFKA/bin/kafka-console-producer.sh --broker-list Desktop:9091 --topic?user_behavior

這里可能碰到[2]中的報錯,注意檢查命令中端口與配置文件server.properties中的listeners的端口嚴格保持一致

[2]中的報錯還可能是某個節(jié)點的kafka掛掉導致的.

?

可能碰到[3]

注意關(guān)閉防火墻

?

?

使用kafka自帶消費端測試下消費$KAFKA/bin/kafka-console-consumer.sh --bootstrap-server Desktop:9091 --from-beginning --topic?user_behavior

如果kafka自帶消費者測試有問題,那么就不用繼續(xù)往下面做了,

此時如果使用Flink SQL Client來消費也必然會出現(xiàn)問題

清除topic中所有數(shù)據(jù)[6](因為,萬一你輸錯了呢?對吧)$KAFKA/bin/kafka-topics.sh --zookeeper Desktop:2181 --delete --topic?user_behavior

需要$KAFKA/config/server.properties設(shè)置

delete.topic.enable=true

傳入kafka的user_behavior的數(shù)據(jù)舉例如下(完整數(shù)據(jù)集):

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "561558", "item_id":"3611281", "category_id": "965809", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "894923", "item_id":"3076029", "category_id": "1879194", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "834377", "item_id":"4541270", "category_id": "3738615", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "315321", "item_id":"942195", "category_id": "4339722", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "625915", "item_id":"1162383", "category_id": "570735", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "578814", "item_id":"176722", "category_id": "982926", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "873335", "item_id":"1256540", "category_id": "1451783", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "429984", "item_id":"4625350", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "866796", "item_id":"534083", "category_id": "4203730", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "937166", "item_id":"321683", "category_id": "2355072", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

############################################################################################################

下面時對q1.sql中的DDL/SQL的簡要介紹

DDL/SQL語句作用

CREATE TABLE user_log (
? ? user_id VARCHAR,
? ? item_id VARCHAR,
? ? category_id VARCHAR,
? ? behavior VARCHAR,
? ? ts TIMESTAMP
) WITH (
? ? 'connector.type' = 'kafka',
? ? 'connector.version' = 'universal',
? ? 'connector.topic' = 'user_behavior',
? ? 'connector.startup-mode' = 'earliest-offset',
? ? 'connector.properties.0.key' = 'zookeeper.connect',
? ? 'connector.properties.0.value' = 'Desktop:2181',
? ? 'connector.properties.1.key' = 'bootstrap.servers',
? ? 'connector.properties.1.value' = 'Desktop:9091',
? ? 'update-mode' = 'append',
? ? 'format.type' = 'json',
? ? 'format.derive-schema' = 'true'
);

接收數(shù)據(jù)源頭

CREATE TABLE pvuv_sink (
? ? dt VARCHAR,
? ? pv BIGINT,
? ? uv BIGINT
) WITH (
? ? 'connector.type' = 'jdbc',
? ? 'connector.url' = 'jdbc:mysql://Desktop:3306/flink-test',
? ? 'connector.table' = 'pvuv_sink',
? ? 'connector.username' = 'appleyuchi',
? ? 'connector.password' = 'appleyuchi',
? ? 'connector.write.flush.max-rows' = '1'
);

數(shù)據(jù)存儲目標
INSERT INTO pvuv_sink
SELECT
? DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
? COUNT(*) AS pv,
? COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

對source的數(shù)據(jù)的處理,

指定要存入的sink

網(wǎng)頁可能會污染上述DDL導致各種報錯,以下面為準:

https://gitee.com/appleyuchi/Flink_Code/blob/master/FLINK讀寫各種數(shù)據(jù)源/flink-sql-submit/src/main/resources/q1.sql

?

整個實驗操作步驟流程其實就是一句話:

kafka往user_behavior這個topic填入數(shù)據(jù)以后

啟動Flink SQL Client

$FLINK_HOME/bin/sql-client.sh embedded

然后上面2句DDL+1句SQL全部復制到Flink SQL Client按下回車,

就會自動生成任務(wù)提交到Flink集群,實驗結(jié)束.

?

?

最終實驗效果如下:

?

一點題外話:

整個實驗運作起來后,硬盤磁頭一直在響,所以一旦看到mysql中有數(shù)據(jù)sink以后,

立刻關(guān)掉flink集群,不然實在太傷硬盤了.

Reference:

[1]Flink 1.9 實戰(zhàn):使用 SQL 讀取 Kafka 并寫入 MySQ

[2]Flink 1.10 SQL 讀寫Kafka

創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎

總結(jié)

以上是生活随笔為你收集整理的Kafka2.5->Flink1.12->Mysql8(Jark实验改为DDL形式)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。