日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)

發(fā)布時(shí)間:2024/2/28 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三) 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

兩種方式創(chuàng)建DataSet

現(xiàn)在數(shù)據(jù)庫(kù)中創(chuàng)建表不能給插入少量數(shù)據(jù)。

?

javaapi:

package SparkSql;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrameReader; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext;import java.util.HashMap; import java.util.Map;/*** @author George* @description* 讀取JDBC中的數(shù)據(jù)創(chuàng)建DataFrame(MySql為例)* 兩種方式創(chuàng)建DataFrame**/ public class JDBCDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("jdbc");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);/*** 第一種方式讀取MySql數(shù)據(jù)庫(kù)表,加載為DataFrame*/Map<String,String> options = new HashMap<>();options.put("url","jdbc:mysql://localhost:3306/spark");options.put("driver","com.mysql.jdbc.Driver");options.put("user","root");options.put("password","123456");options.put("dbtable","person");// options 為基礎(chǔ)數(shù)據(jù)源添加輸入選項(xiàng)。Dataset<Row> person = sqlContext.read().format("jdbc").options(options).load();person.show();/*** +---+----------+---+* | id| name|age|* +---+----------+---+* | 1| George| 22|* | 2| kangkang| 20|* | 3|GeorgeDage| 28|* | 4| limumu| 1|* +---+----------+---+*/person.registerTempTable("person");/*** 第二種方式讀取MySql數(shù)據(jù)表加載為DataFrame*/DataFrameReader reader = sqlContext.read().format("jdbc");reader.option("url","jdbc:mysql://localhost:3306/spark");reader.option("driver","com.mysql.jdbc.Driver");reader.option("user","root");reader.option("password","123456");reader.option("dbtable","score");Dataset<Row> load = reader.load();load.show();/*** +---+----------+-----+* | id| name|score|* +---+----------+-----+* | 1| George| 100|* | 2| kangkang| 100|* | 3|GeorgeDage| 90|* | 4| limumu| 120|* +---+----------+-----+*/load.registerTempTable("score");sc.stop();} }

scalaAPI:

并將組合的數(shù)據(jù)重新插入到mysql中

package SparkSqlimport java.util.Propertiesimport org.apache.spark.sql.{SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutableobject JDBCScalaDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("jdbc").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("error")val sqlContext = new SQLContext(sc)/*** 第一種方式讀取Mysql數(shù)據(jù)庫(kù)表創(chuàng)建DF*/val options = new mutable.HashMap[String,String]()options.put("url", "jdbc:mysql://localhost:3306/spark")options.put("driver","com.mysql.jdbc.Driver")options.put("user","root")options.put("password", "123456")options.put("dbtable","person")val person = sqlContext.read.format("jdbc").options(options).load()person.show()person.registerTempTable("person")/*** 第二種方式讀取Mysql數(shù)據(jù)庫(kù)表創(chuàng)建DF*/val reader = sqlContext.read.format("jdbc")reader.option("url", "jdbc:mysql://localhost:3306/spark")reader.option("driver","com.mysql.jdbc.Driver")reader.option("user","root")reader.option("password","123456")reader.option("dbtable", "score")val frame = reader.load()frame.show()frame.registerTempTable("score")/*** +---+----------+---+* | id| name|age|* +---+----------+---+* | 1| George| 22|* | 2| kangkang| 20|* | 3|GeorgeDage| 28|* | 4| limumu| 1|* +---+----------+---+** +---+----------+-----+* | id| name|score|* +---+----------+-----+* | 1| George| 100|* | 2| kangkang| 100|* | 3|GeorgeDage| 90|* | 4| limumu| 120|* +---+----------+-----+*/val result = sqlContext.sql("select person.id,person.name,person.age," +"score.score from person,score where person.id = score.id")result.show()/*** +---+----------+---+-----+* | id| name|age|score|* +---+----------+---+-----+* | 1| George| 22| 100|* | 3|GeorgeDage| 28| 90|* | 4| limumu| 1| 120|* | 2| kangkang| 20| 100|* +---+----------+---+-----+*//*** 將數(shù)據(jù)寫入到mysql中*/val properties = new Properties()properties.setProperty("user", "root")properties.setProperty("password", "123456")result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/spark", "result", properties)sc.stop()} }

結(jié)果:去mysql查看:?

總結(jié)

以上是生活随笔為你收集整理的Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。