Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)
生活随笔
收集整理的這篇文章主要介紹了
Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
由于Hive不在本地,操作略顯麻煩。不過細心一點,分析錯誤,也還好,如果你搭建的hadoop是HA,需要多注意:
這里指出一個錯誤,如果你報了同類錯誤,可以參考:https://georgedage.blog.csdn.net/article/details/103086882
讀取Hive中的數據加載成DataFrame/DataSet
- HiveContext是SQLContext的子類,連接Hive建議使用HiveContext。
- 由于本地沒有Hive環境,要提交到集群運行,提交命令:
javaAPI:
package com.henu;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext;/*** @author George* @description* 讀取Hive中的數據加載成DataFrame**/ public class HiveDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("hive");SparkContext sc = new SparkContext(conf);//HiveContext是SQLContext的子類。/*** 友情提示,在2.3.1中HiveContext被* SparkSession.builder.enableHiveSupport* 所替代*/HiveContext hiveContext = new HiveContext(sc);hiveContext.sql("use spark");hiveContext.sql("drop table if exists student_infos");//在hive中創建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos");hiveContext.sql("drop table if exists student_scores");hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores");/*** 查詢表生成Dataset*/Dataset<Row> dataset = hiveContext.sql("SELECT si.name, si.age, ss.score "+ "FROM student_infos si "+ "JOIN student_scores ss "+ "ON si.name=ss.name "+ "WHERE ss.score>=80"); // dataset.show();hiveContext.sql("drop table if exists good_student_infos");dataset.registerTempTable("goodstudent"); // Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");Dataset<Row> sql = hiveContext.sql("select * from goodstudent");sql.show();/*** 將結果保存到hive表 good_student_infos*/dataset.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");sc.stop();} }將其打包放在linux上執行:
根據代碼,注意本地的數據源文件:
?
然后別忘了啟動hadoop集群,
然后提交命令。【會有點慢】
然后進入hive中查看是否成功:
0: jdbc:hive2://henu2:10000> use spark; No rows affected (0.35 seconds) 0: jdbc:hive2://henu2:10000> show tables; +-----------------+--+ | tab_name | +-----------------+--+ | student_infos | | student_scores | +-----------------+--+ 2 rows selected (0.81 seconds) 0: jdbc:hive2://henu2:10000> select * from student_infos; +---------------------+--------------------+--+ | student_infos.name | student_infos.age | +---------------------+--------------------+--+ | George | 22 | | kangkang | 20 | | GeorgeDage | 28 | | limu | 1 | +---------------------+--------------------+--+ 4 rows selected (0.967 seconds)scalaAPI:
package com.henuimport org.apache.spark.sql.SaveMode import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext}object HiveScalaDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("hive")val sc = new SparkContext(conf)val hiveContext = new HiveContext(sc)hiveContext.sql("use spark")hiveContext.sql("drop table if exists student_infos")//在hive中創建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '")hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos")hiveContext.sql("drop table if exists student_scores")hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '")hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")hiveContext.sql("drop table if exists good_student_infos")/*** 將結果寫入到hive表中**/df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")sc.stop()} }說實話,那個新表我不知道,它在哪里,就是結果寫入hive表。【沒成功】,歡迎指正
然后我就想開了個辦法:
修改了代碼:
package com.henu;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext;/*** @author George* @description* 讀取Hive中的數據加載成DataFrame**/ public class HiveDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("hive");SparkContext sc = new SparkContext(conf);//HiveContext是SQLContext的子類。/*** 友情提示,在2.3.1中HiveContext被* SparkSession.builder.enableHiveSupport* 所替代*/HiveContext hiveContext = new HiveContext(sc);hiveContext.sql("use spark");hiveContext.sql("drop table if exists student_infos");//在hive中創建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos");hiveContext.sql("drop table if exists student_scores");hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores");/*** 查詢表生成Dataset*/Dataset<Row> dataset = hiveContext.sql("SELECT si.name, si.age, ss.score "+ "FROM student_infos si "+ "JOIN student_scores ss "+ "ON si.name=ss.name "+ "WHERE ss.score>=80"); // dataset.show();hiveContext.sql("drop table if exists good_student_infos");dataset.registerTempTable("goodstudent"); // Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");sql.show();/*** 將結果保存到hive表 good_student_infos*/dataset.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");sc.stop();} }也就是這一句:
Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");貌似已經成功了:
再去查一下表:然而!!!
0: jdbc:hive2://henu2:10000> show tables; +---------------------+--+ | tab_name | +---------------------+--+ | good_student_infos | | student_infos | | student_scores | +---------------------+--+ 3 rows selected (0.653 seconds) 0: jdbc:hive2://henu2:10000> select * from good_student_infos; +--------------------------+-------------------------+---------------------------+--+ | good_student_infos.name | good_student_infos.age | good_student_infos.score | +--------------------------+-------------------------+---------------------------+--+ +--------------------------+-------------------------+---------------------------+--+但最起碼表生成了,只是插入數據的問題,
再做修改:
就不上代碼了,因為還是沒導進去,下次見。。。
?
總結
以上是生活随笔為你收集整理的Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark _24 _读取JDBC中的数
- 下一篇: idea提交spark任务,内存不足,指