日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程语言 > python >内容正文

python

python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中...

發(fā)布時(shí)間:2023/12/15 python 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中... 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

說(shuō)明:spark版本:2.2.0

hive版本:1.2.1

需求: 有本地csv格式的一個(gè)文件,格式為${當(dāng)天日期}visit.txt,例如20180707visit.txt,現(xiàn)在需要將其通過(guò)spark-sql程序?qū)崿F(xiàn)將該文件讀取并以parquet的格式通過(guò)外部表的形式保存到hive中,最終要實(shí)現(xiàn)通過(guò)傳參的形式,將該日期區(qū)間內(nèi)的csv文件批量加載進(jìn)去,方式有兩種:

1、之傳入一個(gè)參數(shù),說(shuō)明只加載一天的數(shù)據(jù)進(jìn)去

2、傳入兩個(gè)參數(shù),批量加載這兩個(gè)日期區(qū)間的每一天的數(shù)據(jù)

最終打成jar包,進(jìn)行運(yùn)行

步驟如下:

1、初始化配置,先創(chuàng)建sparkSession(spark2.0版本開(kāi)始將sqlContext、hiveContext同意整合為sparkSession)

//初始化配置

val spark = new sql.SparkSession

.Builder()

.enableHiveSupport()  //操作hive這一步千萬(wàn)不能少

.appName("project_1")

.master("local[2]")

.getOrCreate()

2、先將文件讀進(jìn)來(lái),并轉(zhuǎn)換為DF

val data = spark.read.option("inferSchema", "true").option("header", "false") //這里設(shè)置是否處理頭信息,false代表不處理,也就是說(shuō)文件的第一行也會(huì)被加載進(jìn)來(lái),如果設(shè)置為true,那么加載進(jìn)來(lái)的數(shù)據(jù)中不包含第一行,第一行被當(dāng)作了頭信息,也就是表中的字段名處理了

.csv(s"file:///home/spark/file/project/${i}visit.txt")  //這里設(shè)置讀取的文件,${i}是我引用的一個(gè)變量,如果要在雙引號(hào)之間引用變量的話(huà),括號(hào)前面的那個(gè)s不能少

.toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time") //將讀進(jìn)來(lái)的數(shù)據(jù)轉(zhuǎn)換為DF,并為每個(gè)字段設(shè)置字段名

3、將轉(zhuǎn)換后的DF注冊(cè)為一張臨時(shí)表

data.createTempView(s"table_${i}")

4、通過(guò)spark-sql創(chuàng)建hive外部表,這里有坑

spark.sql(

s"""

|create external table if not exists ${i}visit

|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,

|region string, screen string, stay_time int) stored as parquet

|location 'hdfs://master:9000/project_dest/${i}'

""".stripMargin)

這里的見(jiàn)表語(yǔ)句需要特別注意,如果寫(xiě)成如下的方式是錯(cuò)誤的:

spark.sql(

s"""

|create external table if not exists ${i}visit

|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,

|region string, screen string, stay_time int) row format delimited fields terminated by '\t' stored as parquet

|location /project_dest/${i}'

""".stripMargin)

(1)對(duì)于row format delimited fields terminated by '\t'這語(yǔ)句只支持存儲(chǔ)文件格式為textFile,對(duì)于parquet文件格式不支持

(2)對(duì)于location這里,一定要寫(xiě)hdfs的全路徑,如果向上面這樣寫(xiě),系統(tǒng)不認(rèn)識(shí),切記

5、通過(guò)spark-sql執(zhí)行insert語(yǔ)句,將數(shù)據(jù)插入到hive表中

spark.sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)

至此,即完成了將本地?cái)?shù)據(jù)以parquet的形式加載至hive表中了,接下來(lái)既可以到hive表中進(jìn)行查看數(shù)據(jù)是否成功載入

貼一下完整代碼:

package _sql.project_1

import org.apache.spark.sql

/**

* Author Mr. Guo

* Create 2018/9/4 - 9:04

* ┌───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┬───┐ ┌───┬───┬───┐

* │Esc│ │ F1│ F2│ F3│ F4│ │ F5│ F6│ F7│ F8│ │ F9│F10│F11│F12│ │P/S│S L│P/B│ ┌┐ ┌┐ ┌┐

* └───┘ └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┴───┘ └───┴───┴───┘ └┘ └┘ └┘

* ┌───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───┬───────┐ ┌───┬───┬───┐ ┌───┬───┬───┬───┐

* │~ `│! 1│@ 2│# 3│$ 4│% 5│^ 6│& 7│* 8│( 9│) 0│_ -│+ =│ BacSp │ │Ins│Hom│PUp│ │N L│ / │ * │ - │

* ├───┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─────┤ ├───┼───┼───┤ ├───┼───┼───┼───┤

* │ Tab │ Q │ W │ E │ R │ T │ Y │ U │ I │ O │ P │{ [│} ]│ | \ │ │Del│End│PDn│ │ 7 │ 8 │ 9 │ │

* ├─────┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴┬──┴─────┤ └───┴───┴───┘ ├───┼───┼───┤ + │

* │ Caps │ A │ S │ D │ F │ G │ H │ J │ K │ L │: ;│" '│ Enter │ │ 4 │ 5 │ 6 │ │

* ├──────┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴────────┤ ┌───┐ ├───┼───┼───┼───┤

* │ Shift │ Z │ X │ C │ V │ B │ N │ M │< ,│> .│? /│ Shift │ │ ↑ │ │ 1 │ 2 │ 3 │ │

* ├─────┬──┴─┬─┴──┬┴───┴───┴───┴───┴───┴──┬┴───┼───┴┬────┬────┤ ┌───┼───┼───┐ ├───┴───┼───┤ E││

* │ Ctrl│ │Alt │ Space │ Alt│ │ │Ctrl│ │ ← │ ↓ │ → │ │ 0 │ . │←─┘│

* └─────┴────┴────┴───────────────────────┴────┴────┴────┴────┘ └───┴───┴───┘ └───────┴───┴───┘

**/

object Spark_Sql_Load_Data_To_Hive {

//初始化配置

val spark = new sql.SparkSession

.Builder()

.enableHiveSupport()

.appName("project_1")

.master("local[2]")

.getOrCreate()

//設(shè)置日志的級(jí)別

spark.sparkContext.setLogLevel("WARN")

def main(args: Array[String]): Unit = {

try {

if (args.length != 1) {

data_load(args(0).toInt)

} else if (args.length != 2) {

for (i

data_load(i)

}

} else {

System.err.println("Usage: or ")

System.exit(1)

}

}catch {

case ex:Exception => println("Exception")

}finally{

spark.stop()

}

}

def data_load(i:Int): Unit = {

println(s"*******data_${i}********")

val data = spark.read.option("inferSchema", "true").option("header", "false")

.csv(s"file:///home/spark/file/project/${i}visit.txt")

.toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time")

data.createTempView(s"table_${i}")

spark.sql("use project_1".stripMargin)

spark.sql(

s"""

|create external table if not exists ${i}visit

|(mac string, phone_brand string, enter_time timestamp, first_time timestamp, last_time timestamp,

|region string, screen string, stay_time int) stored as parquet

|location 'hdfs://master:9000/project_dest/${i}'

""".stripMargin)

spark

.sql(s"insert overwrite table ${i}visit select * from table_${i}".stripMargin)

}

}

6、打成jar包(我的IDEA版本是2017.3版本)

如果沒(méi)有上面這一欄,點(diǎn)擊View,然后勾選Toolbar即可

點(diǎn)擊ok

此時(shí)這里會(huì)成成這么一個(gè)文件,是編譯之后的class文件

到這個(gè)目錄下會(huì)找到這么一個(gè)jar包

找到該文件夾,上傳到服務(wù)器,cd到該目錄下運(yùn)行命令:

spark-submit --class spark._sql.project_1.Conn_hive --master spark://master:7077 --executor-memory 2g --num-executors 3 /spark_maven_project.jar 20180901 20180910

總結(jié)

以上是生活随笔為你收集整理的python读取oracle数据到hvie parquet_关于sparksql操作hive,读取本地csv文件并以parquet的形式装入hive中...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。