日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 运维知识 > 数据库 >内容正文

数据库

SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总

發布時間:2023/12/10 数据库 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

AVG是求平均值,所以輸出類型是Double類型

1)創建弱類型聚合函數類extends UserDefinedAggregateFunction

class MyAgeFunction extends UserDefinedAggregateFunction {//函數輸入的數據結構,需要new一個具體的結構對象,然后添加結構override def inputSchema: StructType = {new StructType().add("age",LongType)}//計算時的數據結構override def bufferSchema: StructType = {new StructType().add("sum",LongType).add("conut",LongType)}//函數返回的數據類型override def dataType: DataType = DoubleType//表述函數是否穩定override def deterministic: Boolean = true//表述的是函數計算之前的緩沖區的初始化 buffer(0)表示第一個結構:sum, buffer(1)示第二個結構:countoverride def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}//根據查詢結構來更新緩沖區數據sum + = input.getLong count+=1override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}//將多個節點的緩沖區合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//計算override def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble / buffer.getLong(1)} }

聚合函數使用

def main(args: Array[String]): Unit = {//創建配置對象val conf = new SparkConf().setAppName("Spark01_Custom").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd1 = spark.sparkContext.makeRDD(List(("chun",21),("chun1",23),("chun3",22)))//隱士轉換(RDD轉換DF/DS需要引入隱式轉換)import spark.implicits._// rdd轉DFval frame = rdd1.toDF("name","age")//創建全局視圖frame.createGlobalTempView("people")//創建聚合函數對象val udaf = new MyAgeFunction//注冊聚合函數spark.udf.register("avgAge",udaf)//frame.select("age").show()//sql 這里表名要把全局名也寫上spark.sql("select avgAge(age) from global_temp.people").show}

2)創建強類型聚合函數AVG(extends Aggregator[輸入類型,緩沖區類型,輸出類型])

//聲明自定義聚合函數(強類型) //case class Aggregator[K, V, C] (這里由三個泛型) class MyAgeClassFuction extends Aggregator[UserBean,AvgBuffer,Double]{//初始化緩沖區override def zero: AvgBuffer = AvgBuffer(0,0)//AvgBuffer = 把輸入的數據更新進緩沖區override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {//sum和count要設置為var的b.sum += a.ageb.count += 1b}//合并緩沖區override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {b1.sum = b1.sum + b2.sumb1.count = b1.count + b2.countb1}//計算結果override def finish(reduction: AvgBuffer): Double = {reduction.sum / reduction.count}//后倆都是數據變成類型之后的轉碼操作//第一個是自定義的類型,就用Encoders.productoverride def bufferEncoder: Encoder[AvgBuffer] = Encoders.product//如果不是自定義類型就用Encoders.scalaBooleanoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble }//樣例類 case class UserBean(name : String, age : Int) case class AvgBuffer(var sum : Int, var count : Int)

使用

def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Spark02_Custom2").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd = spark.sparkContext.makeRDD(List(("chun1",23),("chun2",24),("chun3",25)))import spark.implicits._rdd.toDF("name","age")//自定義強類型聚合函數val udaf = new MyAgeClassFuction//這里不能注冊,加入注冊了名為avgAge,使用的時候是avgAge(字段),但是傳入的應該是Bean對象,所以不可以這樣寫//需要將聚合函數轉換為查詢列val avgColumn = udaf.toColumn.name("avgAge")val userRDD = rdd.map {case (name, age) => {UserBean(name, age)}}//在sql里肯定沒辦法用,需要使用DSL風格select函數val ds = userRDD.toDSval rdd1 = ds.rddds.show()/****結果:+-----+---+| name|age|+-----+---+|chun1| 23||chun2| 24||chun3| 25|+-----+---+**/rdd1.foreach(println)//結果://UserBean(chun1,23)//UserBean(chun3,25)//UserBean(chun2,24)spark.stop()}

可以看到強類型聚合函數輸出的結果每一行都是UserBean類型的,是樣例類類型,并不像弱類型一樣是row

總結

以上是生活随笔為你收集整理的SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。