日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

SparkSQL读取hive中的数据,行转列的两种方式【行转列专用函数,UDAF】

發布時間:2024/2/28 数据库 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkSQL读取hive中的数据,行转列的两种方式【行转列专用函数,UDAF】 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

先給數據:

vi employees 1,George,nan 2,honey,nv 3,georgedage,nan 4,kangkang,nv

上傳數據:

hdfs dfs -mkdir /second hdfs dfs -put employees /second/

創表:

create external table employees(emp_no int,emp_name String,emp_gender String) row format delimited fields terminated by "," location "/second"; +------+----------+----------+ |emp_no| emp_name|emp_gender| +------+----------+----------+ | 1| George| nan| | 2| honey| nv| | 3|georgedage| nan| | 4| kangkang| nv| +------+----------+----------+

?

需求:

按照性別分組,并打出所有名字。

結果展示:

+----------+-----------------+ |emp_gender| name| +----------+-----------------+ | nv| honey|kangkang| | nan|George|georgedage| +----------+-----------------+

以下均使用scalaAPI:

第一種解決方案:【使用行轉列的函數】

詳細行轉列:

https://blog.csdn.net/qq_41946557/article/details/102904642

package com.henuimport org.apache.spark.sql.SparkSessionobject HiveDemo {def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local[4]").appName("hd").enableHiveSupport().getOrCreate()session.sparkContext.setLogLevel("error")session.sql("use spark")val frame = session.sql("select emp_gender,concat_ws('|'," +"collect_set(employees.emp_name)) name from employees group by emp_gender")frame.show()session.stop()} }

結果展示:

+----------+-----------------+ |emp_gender| name| +----------+-----------------+ | nv| honey|kangkang| | nan|George|georgedage| +----------+-----------------+

?

第二種解決方案:

使用UDAF

package com.henuimport org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}class NameUDAF extends UserDefinedAggregateFunction{//輸入數據的結構類型override def inputSchema: StructType = {StructType(List(StructField("emp_name",StringType)))}//緩沖區數據的結構類型override def bufferSchema: StructType = StructType(List(StructField("emp_name",StringType)))//返回值的類型override def dataType: DataType = StringType//都可以override def deterministic: Boolean = true//初始化的操作,初始值賦值為空override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,"")//在worker中每一個分區中進行的操作override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {//獲取原先的值var bfValue=buffer.getString(0)//新傳遞過來的數據val nowValue = input.getString(0)if (bfValue == ""){bfValue=nowValue}else{bfValue += ","+nowValue}//把合并后的數據再存放到緩沖區中buffer.update(0,bfValue)}//合并rdd中所有的數據,為一個數據override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {var bfValue=buffer1.getString(0)val nowValue = buffer2.getString(0)if (bfValue == ""){bfValue=nowValue}else{bfValue += ","+nowValue}buffer1.update(0,bfValue)}//得到緩沖區中存放的數據override def evaluate(buffer: Row): Any = buffer.getString(0) } package com.henu import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.SparkSession object HiveDemo2 {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.WARN)val session = SparkSession.builder().master("local[4]").appName("hive").enableHiveSupport().getOrCreate()session.udf.register("aggr",new NameUDAF())session.sql("use spark")//輸出男女生人數,并輸出對應的名字val df = session.sql("select emp_gender,count(*) num,aggr(emp_name) names from employees group by emp_gender")df.show()session.stop()} }

結果展示:

+----------+---+-----------------+ |emp_gender|num| names| +----------+---+-----------------+ | nv| 2| honey,kangkang| | nan| 2|George,georgedage| +----------+---+-----------------+

友情提示:

記得將hive-site.xml放到resources中如果搭建的是高可用集群,則需要

https://blog.csdn.net/qq_41946557/article/details/103457503

總結

以上是生活随笔為你收集整理的SparkSQL读取hive中的数据,行转列的两种方式【行转列专用函数,UDAF】的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。