flink读mysql速度怎么样_[DB] Flink 读 MySQL
思路
在 Flink 中創建一張表有兩種方法:
從一個文件中導入表結構(Structure)(常用于批計算)(靜態)
從 DataStream 或者 DataSet 轉換成 Table (動態)
package com.kaikeba.mysql.demo
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.types.Row
object Flink2Mysql {
def main(args: Array[String]): Unit = {
//設定執行環境
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = BatchTableEnvironment.create(env)
//通過創建JDBCInputFormat讀取JDBC數據源
val jdbcDataSet: DataSet[Row] =
env.createInput(JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://127.0.0.1:3306/flink-mysql?serverTimezone=GMT%2B8&characterEncoding=UTF-8&useSSL=false")
.setUsername("root")
.setPassword("Chen1227+")
.setQuery("select * from filter")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
)
//將DataSet注冊為表
tEnv.registerDataSet("tb", jdbcDataSet)
//執行查詢操作
val table = tEnv.sqlQuery("select * from tb")
//把table轉為DataSet
tEnv.toDataSet[Row](table).print()
}
}
?
參考
Flink 讀寫 Mysql
Flink流處理訪問MySQL
Flink實例
總結
以上是生活随笔為你收集整理的flink读mysql速度怎么样_[DB] Flink 读 MySQL的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 银行卡信息生成
- 下一篇: linux cmake编译源码,linu