生活随笔
收集整理的這篇文章主要介紹了
spark整合MySQL
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
spark整合MySQL
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency>
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Data2MysqlForeach {
def main(args: Array[String]): Unit = {
//1、構建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName(“Data2MysqlForeach”).setMaster(“local[2]”)
//2、構建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")//3、讀取數據文件
val data: RDD[String] = sc.textFile("E:\\data\\person.txt")//4、切分每一行 // id name age
val personRDD: RDD[(String, String, Int)] = data.map(x => x.split(",")).map(x => (x(0), x(1), x(2).toInt))//5、把數據保存到mysql表中personRDD.foreach(line =>{//每條數據與mysql建立連接//把數據插入到mysql表操作//1、獲取連接val connection: Connection = DriverManager.getConnection("jdbc:mysql://node1:3306/spark","root","123456")//2、定義插入數據的sql語句val sql="insert into person(id,name,age) values(?,?,?)"//3、獲取PreParedStatementtry {val ps: PreparedStatement = connection.prepareStatement(sql)//4、獲取數據,給?號 賦值ps.setString(1, line._1)ps.setString(2, line._2)ps.setInt(3, line._3)ps.execute()} catch {case e:Exception => e.printStackTrace()} finally {if(connection !=null){connection.close()}}})}
使用 foreachPartition 算子
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Data2MysqlForeachPartitions {
def main(args: Array[String]): Unit = {
//1、構建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName(“Data2MysqlForeachPartitions”).setMaster(“local[2]”)
//2、構建SparkContext
val sc = new SparkContext(sparkConf)
sc.setLogLevel("warn")//3、讀取數據文件
val data: RDD[String] = sc.textFile("E:\\data\\person.txt")//4、切分每一行 // id name age
val personRDD: RDD[(String, String, Int)] = data.map(x => x.split(",")).map(x => (x(0), x(1), x(2).toInt))//5、把數據保存到mysql表中
//使用foreachPartition每個分區建立一次鏈接,減少與mysql鏈接次數
personRDD.foreachPartition( iter =>{//把數據插入到mysql表操作//1、獲取連接val connection: Connection = DriverManager.getConnection("jdbc:mysql://node1:3306/spark","root","123456")//2、定義插入數據的sql語句val sql="insert into person(id,name,age) values(?,?,?)"//3、獲取PreParedStatementtry {val ps: PreparedStatement = connection.prepareStatement(sql)//4、獲取數據,給?號 賦值iter.foreach(line =>{ps.setString(1, line._1)ps.setString(2, line._2)ps.setInt(3, line._3)ps.execute()})} catch {case e:Exception => e.printStackTrace()} finally {if(connection !=null){connection.close()}}
}
總結
以上是生活随笔為你收集整理的spark整合MySQL的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。