日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)

發布時間:2024/2/28 编程问答 41 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

由于Hive不在本地,操作略顯麻煩。不過細心一點,分析錯誤,也還好,如果你搭建的hadoop是HA,需要多注意:

這里指出一個錯誤,如果你報了同類錯誤,可以參考:https://georgedage.blog.csdn.net/article/details/103086882

讀取Hive中的數據加載成DataFrame/DataSet

  • HiveContext是SQLContext的子類,連接Hive建議使用HiveContext。
  • 由于本地沒有Hive環境,要提交到集群運行,提交命令:
spark-submit --master spark://henu1:7077 --executor-cores 1 --executor -memory 2G --total-executor-cores 1 --class com.henu.HiveDemo /root/Hive-1.0-SNAPSHOT-jar-with-dependencies.jar

javaAPI:

package com.henu;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext;/*** @author George* @description* 讀取Hive中的數據加載成DataFrame**/ public class HiveDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("hive");SparkContext sc = new SparkContext(conf);//HiveContext是SQLContext的子類。/*** 友情提示,在2.3.1中HiveContext被* SparkSession.builder.enableHiveSupport* 所替代*/HiveContext hiveContext = new HiveContext(sc);hiveContext.sql("use spark");hiveContext.sql("drop table if exists student_infos");//在hive中創建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos");hiveContext.sql("drop table if exists student_scores");hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores");/*** 查詢表生成Dataset*/Dataset<Row> dataset = hiveContext.sql("SELECT si.name, si.age, ss.score "+ "FROM student_infos si "+ "JOIN student_scores ss "+ "ON si.name=ss.name "+ "WHERE ss.score>=80"); // dataset.show();hiveContext.sql("drop table if exists good_student_infos");dataset.registerTempTable("goodstudent"); // Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");Dataset<Row> sql = hiveContext.sql("select * from goodstudent");sql.show();/*** 將結果保存到hive表 good_student_infos*/dataset.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");sc.stop();} }

將其打包放在linux上執行:

根據代碼,注意本地的數據源文件:

?

然后別忘了啟動hadoop集群,

然后提交命令。【會有點慢】

然后進入hive中查看是否成功:

0: jdbc:hive2://henu2:10000> use spark; No rows affected (0.35 seconds) 0: jdbc:hive2://henu2:10000> show tables; +-----------------+--+ | tab_name | +-----------------+--+ | student_infos | | student_scores | +-----------------+--+ 2 rows selected (0.81 seconds) 0: jdbc:hive2://henu2:10000> select * from student_infos; +---------------------+--------------------+--+ | student_infos.name | student_infos.age | +---------------------+--------------------+--+ | George | 22 | | kangkang | 20 | | GeorgeDage | 28 | | limu | 1 | +---------------------+--------------------+--+ 4 rows selected (0.967 seconds)

scalaAPI:

package com.henuimport org.apache.spark.sql.SaveMode import org.apache.spark.sql.hive.HiveContext import org.apache.spark.{SparkConf, SparkContext}object HiveScalaDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("hive")val sc = new SparkContext(conf)val hiveContext = new HiveContext(sc)hiveContext.sql("use spark")hiveContext.sql("drop table if exists student_infos")//在hive中創建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '")hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos")hiveContext.sql("drop table if exists student_scores")hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '")hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores")val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name")hiveContext.sql("drop table if exists good_student_infos")/*** 將結果寫入到hive表中**/df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")sc.stop()} }

說實話,那個新表我不知道,它在哪里,就是結果寫入hive表。【沒成功】,歡迎指正

然后我就想開了個辦法:

修改了代碼:

package com.henu;import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext;/*** @author George* @description* 讀取Hive中的數據加載成DataFrame**/ public class HiveDemo {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("hive");SparkContext sc = new SparkContext(conf);//HiveContext是SQLContext的子類。/*** 友情提示,在2.3.1中HiveContext被* SparkSession.builder.enableHiveSupport* 所替代*/HiveContext hiveContext = new HiveContext(sc);hiveContext.sql("use spark");hiveContext.sql("drop table if exists student_infos");//在hive中創建student_infos表hiveContext.sql("create table if not exists student_infos(name String,age Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_info' into table student_infos");hiveContext.sql("drop table if exists student_scores");hiveContext.sql("create table if not exists student_scores (name String,score Int)" +"row format delimited fields terminated by ' '");hiveContext.sql("load data local inpath '/root/test/student_scores' into table student_scores");/*** 查詢表生成Dataset*/Dataset<Row> dataset = hiveContext.sql("SELECT si.name, si.age, ss.score "+ "FROM student_infos si "+ "JOIN student_scores ss "+ "ON si.name=ss.name "+ "WHERE ss.score>=80"); // dataset.show();hiveContext.sql("drop table if exists good_student_infos");dataset.registerTempTable("goodstudent"); // Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");sql.show();/*** 將結果保存到hive表 good_student_infos*/dataset.write().mode(SaveMode.Overwrite).saveAsTable("good_student_infos");sc.stop();} }

也就是這一句:

Dataset<Row> sql = hiveContext.sql("create table good_student_infos as select * from goodstudent");

貌似已經成功了:

再去查一下表:然而!!!

0: jdbc:hive2://henu2:10000> show tables; +---------------------+--+ | tab_name | +---------------------+--+ | good_student_infos | | student_infos | | student_scores | +---------------------+--+ 3 rows selected (0.653 seconds) 0: jdbc:hive2://henu2:10000> select * from good_student_infos; +--------------------------+-------------------------+---------------------------+--+ | good_student_infos.name | good_student_infos.age | good_student_infos.score | +--------------------------+-------------------------+---------------------------+--+ +--------------------------+-------------------------+---------------------------+--+

但最起碼表生成了,只是插入數據的問題,

再做修改:

就不上代碼了,因為還是沒導進去,下次見。。。

?

總結

以上是生活随笔為你收集整理的Spark _25 _读取Hive中的数据加载成DataFrame/DataSet(四)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。