日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

spark整合MySQL

發布時間:2024/8/23 数据库 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark整合MySQL 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

spark整合MySQL

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency>

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Data2MysqlForeach {
def main(args: Array[String]): Unit = {
//1、構建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName(“Data2MysqlForeach”).setMaster(“local[2]”)

//2、構建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn")//3、讀取數據文件 val data: RDD[String] = sc.textFile("E:\\data\\person.txt")//4、切分每一行 // id name age val personRDD: RDD[(String, String, Int)] = data.map(x => x.split(",")).map(x => (x(0), x(1), x(2).toInt))//5、把數據保存到mysql表中personRDD.foreach(line =>{//每條數據與mysql建立連接//把數據插入到mysql表操作//1、獲取連接val connection: Connection = DriverManager.getConnection("jdbc:mysql://node1:3306/spark","root","123456")//2、定義插入數據的sql語句val sql="insert into person(id,name,age) values(?,?,?)"//3、獲取PreParedStatementtry {val ps: PreparedStatement = connection.prepareStatement(sql)//4、獲取數據,給?號 賦值ps.setString(1, line._1)ps.setString(2, line._2)ps.setInt(3, line._3)ps.execute()} catch {case e:Exception => e.printStackTrace()} finally {if(connection !=null){connection.close()}}})}

使用 foreachPartition 算子
import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Data2MysqlForeachPartitions {
def main(args: Array[String]): Unit = {
//1、構建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName(“Data2MysqlForeachPartitions”).setMaster(“local[2]”)

//2、構建SparkContext val sc = new SparkContext(sparkConf) sc.setLogLevel("warn")//3、讀取數據文件 val data: RDD[String] = sc.textFile("E:\\data\\person.txt")//4、切分每一行 // id name age val personRDD: RDD[(String, String, Int)] = data.map(x => x.split(",")).map(x => (x(0), x(1), x(2).toInt))//5、把數據保存到mysql表中 //使用foreachPartition每個分區建立一次鏈接,減少與mysql鏈接次數 personRDD.foreachPartition( iter =>{//把數據插入到mysql表操作//1、獲取連接val connection: Connection = DriverManager.getConnection("jdbc:mysql://node1:3306/spark","root","123456")//2、定義插入數據的sql語句val sql="insert into person(id,name,age) values(?,?,?)"//3、獲取PreParedStatementtry {val ps: PreparedStatement = connection.prepareStatement(sql)//4、獲取數據,給?號 賦值iter.foreach(line =>{ps.setString(1, line._1)ps.setString(2, line._2)ps.setInt(3, line._3)ps.execute()})} catch {case e:Exception => e.printStackTrace()} finally {if(connection !=null){connection.close()}} }

總結

以上是生活随笔為你收集整理的spark整合MySQL的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。