Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)
生活随笔
收集整理的這篇文章主要介紹了
Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1、編寫Spark SQL查詢語句
在這之前創(chuàng)建Maven項(xiàng)目。創(chuàng)建的過程如:http://blog.csdn.net/tototuzuoquan/article/details/74571374
在這里:http://blog.csdn.net/tototuzuoquan/article/details/74907124,可以知道Spark Shell中使用SQL完成查詢,下面通過在自定義程序中編寫Spark SQL查詢程序。首先在maven項(xiàng)目的pom.xml中添加Spark SQL的依賴。
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.5.2</version> </dependency>最終的Pom文件內(nèi)容如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.spark</groupId><artifactId>bigdata</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>1.7</maven.compiler.source><maven.compiler.target>1.7</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.10.6</scala.version><spark.version>1.6.2</spark.version><hadoop.version>2.6.4</hadoop.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.10</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.10</artifactId><version>1.5.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-make:transitive</arg><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build></project>2、運(yùn)行參數(shù)準(zhǔn)備
person.txt的內(nèi)容如下:
3、通過反射推斷出Schema
package cn.toto.sparkimport org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/10.*/ object InferringSchema {def main(args: Array[String]): Unit = {//創(chuàng)建SparkConf()并設(shè)置App名稱(本地運(yùn)行的時(shí)候加上:setMaster("local"),如果不是本地就不加這句)val conf = new SparkConf().setAppName("SQL-1").setMaster("local")//SQLContext要依賴SparkContextval sc = new SparkContext(conf)//創(chuàng)建SQLContextval sqlContext = new SQLContext(sc)//從指定的地址創(chuàng)建RDDval lineRDD = sc.textFile(args(0)).map(_.split(" "))//創(chuàng)建case class//將RDD和case class關(guān)聯(lián)val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))//引入隱式轉(zhuǎn)換,如果不到人無法將RDD轉(zhuǎn)換成DataFrame//將RDD轉(zhuǎn)換成DataFrameimport sqlContext.implicits._val personDF = personRDD.toDF//注冊(cè)表personDF.registerTempTable("t_person")//傳入SQLval df = sqlContext.sql("select * from t_person order by age desc limit 2")//將結(jié)果以JSON的方式存儲(chǔ)到指定位置df.write.json(args(1))//停止Spark Contextsc.stop()} }//case class一定要放在外面 case class Person(id:Int, name:String, age : Int)參數(shù)配置:
運(yùn)行程序,結(jié)果如下:
將程序打包成jar,上傳到Spark集群,提交Spark任務(wù)(要以代碼中要去掉setMaster(“l(fā)ocal”))
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-submit --class cn.toto.spark.InferringSchema --master spark://hadoop1:7077,hadoop2:7077 /home/tuzq/software/sparkdata/bigdata-1.0-SNAPSHOT.jar hdfs://mycluster/person.txt hdfs://mycluster/out4.通過StructType直接指定Schema
代碼如下:
package cn.toto.sparkimport org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/10.*/ object SpecifyingSchema {def main(args: Array[String]): Unit = {//創(chuàng)建SparkConf()并設(shè)置App名稱val conf = new SparkConf().setAppName("SQL-2").setMaster("local")//SQLContext要依賴SparkContextval sc = new SparkContext(conf)//創(chuàng)建SQLContextval sqlContext = new SQLContext(sc)//從指定的地址創(chuàng)建RDDval personRDD = sc.textFile(args(0)).map(_.split(" "))//通過StructType直接指定每個(gè)字段的Schema,相當(dāng)于是表的描述信息val schema = StructType(List(StructField("id",IntegerType,true),StructField("name",StringType,true),StructField("age",IntegerType,true)))//將RDD映射到rowRDDval rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))//將schema信息應(yīng)用到rowRDD上val personDataFrame = sqlContext.createDataFrame(rowRDD,schema)//注冊(cè)表personDataFrame.registerTempTable("t_person")//執(zhí)行SQLval df = sqlContext.sql("select * from t_person order by age desc limit 4")//將結(jié)果以JSON的方式存儲(chǔ)到指定位置df.write.json(args(1))//停止Spark Contextsc.stop()} }運(yùn)行參數(shù)配置:
運(yùn)行后的結(jié)果:
總結(jié)
以上是生活随笔為你收集整理的Spark-sql:以编程方式执行Spark SQL查询(通过反射的方式推断出Schema,通过StrutType直接指定Schema)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 小规模纳税人买车可以抵税吗
- 下一篇: 马云为什么选择张勇接任 这位牛人的战绩了