SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总
生活随笔
收集整理的這篇文章主要介紹了
SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
AVG是求平均值,所以輸出類型是Double類型
1)創建弱類型聚合函數類extends UserDefinedAggregateFunction
class MyAgeFunction extends UserDefinedAggregateFunction {//函數輸入的數據結構,需要new一個具體的結構對象,然后添加結構override def inputSchema: StructType = {new StructType().add("age",LongType)}//計算時的數據結構override def bufferSchema: StructType = {new StructType().add("sum",LongType).add("conut",LongType)}//函數返回的數據類型override def dataType: DataType = DoubleType//表述函數是否穩定override def deterministic: Boolean = true//表述的是函數計算之前的緩沖區的初始化 buffer(0)表示第一個結構:sum, buffer(1)示第二個結構:countoverride def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}//根據查詢結構來更新緩沖區數據sum + = input.getLong count+=1override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}//將多個節點的緩沖區合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//計算override def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble / buffer.getLong(1)} }聚合函數使用
def main(args: Array[String]): Unit = {//創建配置對象val conf = new SparkConf().setAppName("Spark01_Custom").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd1 = spark.sparkContext.makeRDD(List(("chun",21),("chun1",23),("chun3",22)))//隱士轉換(RDD轉換DF/DS需要引入隱式轉換)import spark.implicits._// rdd轉DFval frame = rdd1.toDF("name","age")//創建全局視圖frame.createGlobalTempView("people")//創建聚合函數對象val udaf = new MyAgeFunction//注冊聚合函數spark.udf.register("avgAge",udaf)//frame.select("age").show()//sql 這里表名要把全局名也寫上spark.sql("select avgAge(age) from global_temp.people").show}2)創建強類型聚合函數AVG(extends Aggregator[輸入類型,緩沖區類型,輸出類型])
//聲明自定義聚合函數(強類型) //case class Aggregator[K, V, C] (這里由三個泛型) class MyAgeClassFuction extends Aggregator[UserBean,AvgBuffer,Double]{//初始化緩沖區override def zero: AvgBuffer = AvgBuffer(0,0)//AvgBuffer = 把輸入的數據更新進緩沖區override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {//sum和count要設置為var的b.sum += a.ageb.count += 1b}//合并緩沖區override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {b1.sum = b1.sum + b2.sumb1.count = b1.count + b2.countb1}//計算結果override def finish(reduction: AvgBuffer): Double = {reduction.sum / reduction.count}//后倆都是數據變成類型之后的轉碼操作//第一個是自定義的類型,就用Encoders.productoverride def bufferEncoder: Encoder[AvgBuffer] = Encoders.product//如果不是自定義類型就用Encoders.scalaBooleanoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble }//樣例類 case class UserBean(name : String, age : Int) case class AvgBuffer(var sum : Int, var count : Int)使用
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Spark02_Custom2").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd = spark.sparkContext.makeRDD(List(("chun1",23),("chun2",24),("chun3",25)))import spark.implicits._rdd.toDF("name","age")//自定義強類型聚合函數val udaf = new MyAgeClassFuction//這里不能注冊,加入注冊了名為avgAge,使用的時候是avgAge(字段),但是傳入的應該是Bean對象,所以不可以這樣寫//需要將聚合函數轉換為查詢列val avgColumn = udaf.toColumn.name("avgAge")val userRDD = rdd.map {case (name, age) => {UserBean(name, age)}}//在sql里肯定沒辦法用,需要使用DSL風格select函數val ds = userRDD.toDSval rdd1 = ds.rddds.show()/****結果:+-----+---+| name|age|+-----+---+|chun1| 23||chun2| 24||chun3| 25|+-----+---+**/rdd1.foreach(println)//結果://UserBean(chun1,23)//UserBean(chun3,25)//UserBean(chun2,24)spark.stop()}可以看到強類型聚合函數輸出的結果每一行都是UserBean類型的,是樣例類類型,并不像弱類型一樣是row
總結
以上是生活随笔為你收集整理的SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 平安信用卡优惠 平安信用卡周三优惠
- 下一篇: linux cmake编译源码,linu