SparkSQL读取hive中的数据,行转列的两种方式【行转列专用函数,UDAF】
生活随笔
收集整理的這篇文章主要介紹了
SparkSQL读取hive中的数据,行转列的两种方式【行转列专用函数,UDAF】
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
先給數據:
vi employees 1,George,nan 2,honey,nv 3,georgedage,nan 4,kangkang,nv上傳數據:
hdfs dfs -mkdir /second hdfs dfs -put employees /second/創表:
create external table employees(emp_no int,emp_name String,emp_gender String) row format delimited fields terminated by "," location "/second"; +------+----------+----------+ |emp_no| emp_name|emp_gender| +------+----------+----------+ | 1| George| nan| | 2| honey| nv| | 3|georgedage| nan| | 4| kangkang| nv| +------+----------+----------+?
需求:
按照性別分組,并打出所有名字。
結果展示:
+----------+-----------------+ |emp_gender| name| +----------+-----------------+ | nv| honey|kangkang| | nan|George|georgedage| +----------+-----------------+以下均使用scalaAPI:
第一種解決方案:【使用行轉列的函數】
詳細行轉列:
https://blog.csdn.net/qq_41946557/article/details/102904642
package com.henuimport org.apache.spark.sql.SparkSessionobject HiveDemo {def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local[4]").appName("hd").enableHiveSupport().getOrCreate()session.sparkContext.setLogLevel("error")session.sql("use spark")val frame = session.sql("select emp_gender,concat_ws('|'," +"collect_set(employees.emp_name)) name from employees group by emp_gender")frame.show()session.stop()} }結果展示:
+----------+-----------------+ |emp_gender| name| +----------+-----------------+ | nv| honey|kangkang| | nan|George|georgedage| +----------+-----------------+?
第二種解決方案:
使用UDAF
package com.henuimport org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}class NameUDAF extends UserDefinedAggregateFunction{//輸入數據的結構類型override def inputSchema: StructType = {StructType(List(StructField("emp_name",StringType)))}//緩沖區數據的結構類型override def bufferSchema: StructType = StructType(List(StructField("emp_name",StringType)))//返回值的類型override def dataType: DataType = StringType//都可以override def deterministic: Boolean = true//初始化的操作,初始值賦值為空override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,"")//在worker中每一個分區中進行的操作override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {//獲取原先的值var bfValue=buffer.getString(0)//新傳遞過來的數據val nowValue = input.getString(0)if (bfValue == ""){bfValue=nowValue}else{bfValue += ","+nowValue}//把合并后的數據再存放到緩沖區中buffer.update(0,bfValue)}//合并rdd中所有的數據,為一個數據override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {var bfValue=buffer1.getString(0)val nowValue = buffer2.getString(0)if (bfValue == ""){bfValue=nowValue}else{bfValue += ","+nowValue}buffer1.update(0,bfValue)}//得到緩沖區中存放的數據override def evaluate(buffer: Row): Any = buffer.getString(0) } package com.henu import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession object HiveDemo2 {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.WARN)val session = SparkSession.builder().master("local[4]").appName("hive").enableHiveSupport().getOrCreate()session.udf.register("aggr",new NameUDAF())session.sql("use spark")//輸出男女生人數,并輸出對應的名字val df = session.sql("select emp_gender,count(*) num,aggr(emp_name) names from employees group by emp_gender")df.show()session.stop()} }結果展示:
+----------+---+-----------------+ |emp_gender|num| names| +----------+---+-----------------+ | nv| 2| honey,kangkang| | nan| 2|George,georgedage| +----------+---+-----------------+友情提示:
記得將hive-site.xml放到resources中如果搭建的是高可用集群,則需要
https://blog.csdn.net/qq_41946557/article/details/103457503
總結
以上是生活随笔為你收集整理的SparkSQL读取hive中的数据,行转列的两种方式【行转列专用函数,UDAF】的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hive创表异常,FAILED: Exe
- 下一篇: 每日两SQL(2),欢迎交流~