日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

SparkSQL操作Hive Table

發(fā)布時(shí)間:2024/1/17 数据库 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkSQL操作Hive Table 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Spark SQL支持對(duì)Hive的讀寫操作。然而因?yàn)镠ive有很多依賴包,所以這些依賴包沒有包含在默認(rèn)的Spark包里面。如果Hive依賴的包能在classpath找到,Spark將會(huì)自動(dòng)加載它們。需要注意的是,這些Hive依賴包必須復(fù)制到所有的工作節(jié)點(diǎn)上,因?yàn)樗鼈優(yōu)榱四軌蛟L問存儲(chǔ)在Hive的數(shù)據(jù),會(huì)調(diào)用Hive的序列化和反序列化(SerDes)包。Hive的配置文件hive-site.xmlcore-site.xml(security配置)和hdfs-site.xml(HDFS配置)是保存在conf目錄下面。?
當(dāng)使用Hive時(shí),必須初始化一個(gè)支持Hive的SparkSession,用戶即使沒有部署一個(gè)Hive的環(huán)境仍然可以使用Hive。當(dāng)沒有配置hive-site.xml時(shí),Spark會(huì)自動(dòng)在當(dāng)前應(yīng)用目錄創(chuàng)建metastore_db和創(chuàng)建由spark.sql.warehouse.dir配置的目錄,如果沒有配置,默認(rèn)是當(dāng)前應(yīng)用目錄下的spark-warehouse目錄。?
注意:從Spark 2.0.0版本開始,hive-site.xml里面的hive.metastore.warehouse.dir屬性已經(jīng)被spark.sql.warehouse.dir替代,用于指定warehouse的默認(rèn)數(shù)據(jù)路徑(必須有寫權(quán)限)。

import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public static class Record implements Serializable { private int key; private String value; public int getKey() { return key; } public void setKey(int key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } } // warehouseLocation points to the default location for managed databases and tables String warehouseLocation = "/spark-warehouse"; // init spark session with hive support SparkSession spark = SparkSession .builder() .appName("Java Spark Hive Example") .master("local[*]") .config("spark.sql.warehouse.dir", warehouseLocation) .enableHiveSupport() .getOrCreate(); spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)"); spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src"); // Queries are expressed in HiveQL spark.sql("SELECT * FROM src").show(); // +---+-------+ // |key| value| // +---+-------+ // |238|val_238| // | 86| val_86| // |311|val_311| // ... // only showing top 20 rows // Aggregation queries are also supported. spark.sql("SELECT COUNT(*) FROM src").show(); // +--------+ // |count(1)| // +--------+ // | 500 | // +--------+ // The results of SQL queries are themselves DataFrames and support all normal functions. Dataset<Row> sqlDF = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key"); // The items in DaraFrames are of type Row, which lets you to access each column by ordinal. Dataset<String> stringsDS = sqlDF.map(row -> "Key: " + row.get(0) + ", Value: " + row.get(1), Encoders.STRING()); stringsDS.show(); // +--------------------+ // | value| // +--------------------+ // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // |Key: 0, Value: val_0| // ... // You can also use DataFrames to create temporary views within a SparkSession. List<Record> records = new ArrayList<Record>(); for (int key = 1; key < 100; key++) { Record record = new Record(); record.setKey(key); record.setValue("val_" + key); records.add(record); } Dataset<Row> recordsDF = spark.createDataFrame(records, Record.class); recordsDF.createOrReplaceTempView("records"); // Queries can then join DataFrames data with data stored in Hive. spark.sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show(); // +---+------+---+------+ // |key| value|key| value| // +---+------+---+------+ // | 2| val_2| 2| val_2| // | 2| val_2| 2| val_2| // | 4| val_4| 4| val_4| // ... // only showing top 20 rows

如果使用eclipse運(yùn)行上述代碼的話需要添加spark-hive的jars,下面是maven的配置:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.11 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>2.1.0</version> </dependency>

否則的話會(huì)遇到下面錯(cuò)誤:

Exception in thread "main" java.lang.IllegalArgumentException: Unable to instantiate SparkSession with Hive support because Hive classes are not found. at org.apache.spark.sql.SparkSession$Builder.enableHiveSupport(SparkSession.scala:815) at JavaSparkHiveExample.main(JavaSparkHiveExample.java:17)

與不同版本Hive Metastore的交互

Spark SQL對(duì)Hive的支持其中一個(gè)最重要的部分是與Hive metastore的交互,使得Spark SQL可以訪問Hive表的元數(shù)據(jù)。從Spark 1.4.0版本開始,Spark SQL使用下面的配置可以用于查詢不同版本的Hive metastores。需要注意的是,本質(zhì)上Spark SQL會(huì)使用編譯后的Hive 1.2.1版本的那些類來用于內(nèi)部操作(serdes、UDFs、UDAFs等等)。

總結(jié)

以上是生活随笔為你收集整理的SparkSQL操作Hive Table的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。