日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL

發布時間:2024/8/23 数据库 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

上周六在深圳分享了《Flink SQL 1.9.0 技術內幕和最佳實踐》,會后許多小伙伴對最后演示環節的 Demo 代碼非常感興趣,迫不及待地想嘗試下,所以寫了這篇文章分享下這份代碼。希望對于 Flink SQL 的初學者能有所幫助。完整分享可以觀看 Meetup 視頻回顧 :https://developer.aliyun.com/live/1416

演示代碼已經開源到了 GitHub 上:https://github.com/wuchong/flink-sql-submit

這份代碼主要由兩部分組成:1) 能用來提交 SQL 文件的 SqlSubmit 實現。2) 用于演示的 SQL 示例、Kafka 啟動停止腳本、 一份測試數據集、Kafka 數據源生成器。

通過本實戰,你將學到:

  • 如何使用 Blink Planner
  • 一個簡單的 SqlSubmit 是如何實現的
  • 如何用 DDL 創建一個 Kafka 源表和 MySQL 結果表
  • 運行一個從 Kafka 讀取數據,計算 PVUV,并寫入 MySQL 的作業
  • 設置調優參數,觀察對作業的影響
  • SqlSubmit 的實現

    筆者一開始是想用 SQL Client 來貫穿整個演示環節,但可惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。所以筆者就只好自己寫了個簡單的提交腳本。后來想想,也挺好的,可以讓聽眾同時了解如何通過 SQL 的方式,和編程的方式使用 Flink SQL。

    SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現非常簡單,就是通過正則表達式匹配每個語句塊。如果是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...)。如果是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要如下所示:

    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); // 創建一個使用 Blink Planner 的 TableEnvironment, 并工作在流模式 TableEnvironment tEnv = TableEnvironment.create(settings); // 讀取 SQL 文件 List<String> sql = Files.readAllLines(path); // 通過正則表達式匹配前綴,來區分不同的 SQL 語句 List<SqlCommandCall> calls = SqlCommandParser.parse(sql); // 根據不同的 SQL 語句,調用 TableEnvironment 執行 for (SqlCommandCall call : calls) {switch (call.command) {case SET:String key = call.operands[0];String value = call.operands[1];// 設置參數tEnv.getConfig().getConfiguration().setString(key, value);break;case CREATE_TABLE:String ddl = call.operands[0];tEnv.sqlUpdate(ddl);break;case INSERT_INTO:String dml = call.operands[0];tEnv.sqlUpdate(dml);break;default:throw new RuntimeException("Unsupported command: " + call.command);} } // 提交作業 tEnv.execute("SQL Job");

    使用 DDL 連接 Kafka 源表

    在 flink-sql-submit 項目中,我們準備了一份測試數據集(來自阿里云天池公開數據集,特別鳴謝),位于 src/main/resources/user_behavior.log。數據以 JSON 格式編碼,大概長這個樣子:

    {"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

    為了模擬真實的 Kafka 數據源,筆者還特地寫了一個 source-generator.sh 腳本(感興趣的可以看下源碼),會自動讀取 user_behavior.log 的數據并以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。

    有了數據源后,我們就可以用 DDL 去創建并連接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql)。

    CREATE TABLE user_log (user_id VARCHAR,item_id VARCHAR,category_id VARCHAR,behavior VARCHAR,ts TIMESTAMP ) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_behavior', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取'connector.properties.0.key' = 'zookeeper.connect', -- 連接信息'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append','format.type' = 'json', -- 數據源格式為 json'format.derive-schema' = 'true' -- 從 DDL schema 確定 json 解析規則 )

    注:可能有用戶會覺得其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進并簡化 connector 的參數配置。

    使用 DDL 連接 MySQL 結果表

    連接 MySQL 可以使用 Flink 提供的 JDBC connector。例如

    CREATE TABLE pvuv_sink (dt VARCHAR,pv BIGINT,uv BIGINT ) WITH ('connector.type' = 'jdbc', -- 使用 jdbc connector'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url'connector.table' = 'pvuv_sink', -- 表名'connector.username' = 'root', -- 用戶名'connector.password' = '123456', -- 密碼'connector.write.flush.max-rows' = '1' -- 默認5000條,為了演示改為1條 )

    PV UV 計算

    假設我們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。很多用戶可能會想到使用滾動窗口來計算。但這里我們介紹另一種方式。即 Group Aggregation 的方式。

    INSERT INTO pvuv_sink SELECTDATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,COUNT(*) AS pv,COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

    它使用 DATE_FORMAT 這個內置函數,將日志時間歸一化成“年月日小時”的字符串格式,并根據這個字符串進行分組,即根據每小時分組,然后通過 COUNT(*) 計算用戶訪問量(PV),通過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基于之前計算的值做增量計算(如+1),然后將最新結果輸出。所以實時性很高,但輸出量也大。

    我們將這個查詢的結果,通過 INSERT INTO 語句,寫到了之前定義的 pvuv_sink MySQL 表中。

    注:在深圳 Meetup 中,我們有對這種查詢的性能調優做了深度的介紹。

    實戰演示

    環境準備

    本實戰演示環節需要安裝一些必須的服務,包括:

    • Flink 本地集群:用來運行 Flink SQL 任務。
    • Kafka 本地集群:用來作為數據源。
    • MySQL 數據庫:用來作為結果表。
    • Flink 本地集群安裝

    1.下載 Flink 1.9.0 安裝包并解壓:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
    2.下載以下依賴 jar 包,并拷貝到 flink-1.9.0/lib/ 目錄下。因為我們運行時需要依賴各個 connector 實現。

    • flink-sql-connector-kafka_2.11-1.9.0.jar
      http://central.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.9.0/flink-sql-connector-kafka_2.11-1.9.0.jar
    • flink-json-1.9.0-sql-jar.jar
      http://central.maven.org/maven2/org/apache/flink/flink-json/1.9.0/flink-json-1.9.0-sql-jar.jar
    • flink-jdbc_2.11-1.9.0.jar
      http://central.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.9.0/flink-jdbc_2.11-1.9.0.jar
    • mysql-connector-java-5.1.48.jar
      https://dev.mysql.com/downloads/connector/j/5.1.html

    3.將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改成 10,因為我們的演示任務可能會消耗多于1個的 slot。
    4.在 flink-1.9.0 目錄下執行 ./bin/start-cluster.sh,啟動集群。

    運行成功的話,可以在?http://localhost:8081?訪問到 Flink Web UI。

    另外,還需要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用于后面提交 SQL 任務,如我的路徑是

    FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

    Kafka 本地集群安裝

    下載 Kafka 2.2.0 安裝包并解壓:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

    將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如我的路徑是

    KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

    在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啟動 Kafka 集群。

    在命令行執行 jps,如果看到 Kafka 進程和 QuorumPeerMain 進程即表明啟動成功。

    MySQL 安裝

    可以在官方頁面下載 MySQL 并安裝:
    https://dev.mysql.com/downloads/mysql/
    如果有 Docker 環境的話,也可以直接通過 Docker 安裝
    https://hub.docker.com/_/mysql

    $ docker pull mysql $ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

    然后在 MySQL 中創建一個?flink-test?的數據庫,并按照上文的 schema 創建?pvuv_sink?表。

    提交 SQL 任務

    1.在?flink-sql-submit?目錄下運行?./source-generator.sh,會自動創建?user_behavior topic,并實時往里灌入數據。

    2.在?flink-sql-submit?目錄下運行?./run.sh q1, 提交成功后,可以在 Web UI 中看到拓撲。

    在 MySQL 客戶端,我們也可以實時地看到每個小時的 pv uv 值在不斷地變化

    結尾

    本文帶大家搭建基礎集群環境,并使用 SqlSubmit 提交純 SQL 任務來學習了解如何連接外部系統。flink-sql-submit/src/main/resources/q1.sql?中還有一些注釋掉的調優參數,感興趣的同學可以將參數打開,觀察對作業的影響。關于這些調優參數的原理,可以看下我在?深圳 Meetup?上的分享《Flink SQL 1.9.0 技術內幕和最佳實踐》。


    原文鏈接
    本文為云棲社區原創內容,未經允許不得轉載。

    總結

    以上是生活随笔為你收集整理的Flink 1.9 实战:使用 SQL 读取 Kafka 并写入 MySQL的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。