SparkSQL中UDAF案例分析
SparkSQL中UDAF案例分析
1、統(tǒng)計(jì)單詞的個(gè)數(shù)
package com.bynear.spark_sql; import org.apache.spark.sql.Row; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; public class Spark_UDAF extends UserDefinedAggregateFunction {/** * inputSchema指的是輸入的數(shù)據(jù)類型 * * @return */ @Override public StructType inputSchema() {ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("str", DataTypes.StringType, true)); return DataTypes.createStructType(fields); }/** * bufferSchema指的是 中間進(jìn)行聚合時(shí) 所處理的數(shù)據(jù)類型 * * @return */ @Override public StructType bufferSchema() {ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("count", DataTypes.IntegerType, true)); return DataTypes.createStructType(fields); }/** * dataType指的是函數(shù)返回值的類型 * * @return */ @Override public DataType dataType() {return DataTypes.IntegerType; }/** * 一致性檢驗(yàn),如果為true,那么輸入不變的情況下計(jì)算的結(jié)果也是不變的。 * * @return */ @Override public boolean deterministic() {return true; }/** * 設(shè)置聚合中間buffer的初始值,但需要保證這個(gè)語義:兩個(gè)初始buffer調(diào)用下面實(shí)現(xiàn)的merge方法后也應(yīng)該為初始buffer * 即如果你初始值是1,然后你merge是執(zhí)行一個(gè)相加的動(dòng)作,兩個(gè)初始buffer合并之后等于2, * 不會(huì)等于初始buffer了。這樣的初始值就是有問題的,所以初始值也叫"zero value" * 為每個(gè)分組的數(shù)據(jù)執(zhí)行初始化操作 * * @param buffer */ @Override public void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0); }/** * 用輸入數(shù)據(jù)input更新buffer值,類似于combineByKey * 指的是,每個(gè)分組,有新的值進(jìn)來的時(shí)候,如何進(jìn)行分組對(duì)應(yīng)的聚合值的計(jì)算 * * @param buffer * @param input */ @Override public void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0, Integer.valueOf(buffer.getAs(0).toString()) + 1); }/** * 合并兩個(gè)buffer,將buffer2合并到buffer1.在合并兩個(gè)分區(qū)聚合結(jié)果的時(shí)候會(huì)被用到,類似于reduceByKey * 這里要注意該方法沒有返回值,在實(shí)現(xiàn)的時(shí)候是把buffer2合并到buffer1中去,你需要實(shí)現(xiàn)這個(gè)合并細(xì)節(jié) * 由于spark是分布式的,所以每一分組的數(shù)據(jù),可能會(huì)在不同的節(jié)點(diǎn)上進(jìn)行局部聚合,就是update * 但是 最后一個(gè)分組,在各個(gè)節(jié)點(diǎn)上的聚合值,要進(jìn)行merge 也就是合并 * * @param buffer1 * @param buffer2 */ @Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0, Integer.valueOf(buffer1.getAs(0).toString()) + Integer.valueOf(buffer2.getAs(0).toString())); }/** * 只的是 一個(gè)分組的聚合值,如何通過中間的緩存聚合值,最后返回一個(gè)最終的聚合值 * * @param buffer * @return */ @Override public Object evaluate(Row buffer) {return buffer.getInt(0); } }package com.bynear.spark_sql; import com.clearspring.analytics.util.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.Arrays; import java.util.List; public class UDAF {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("UDAF").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); List<String> nameList = Arrays.asList("xiaoming", "xiaoming", "劉德華","古天樂","feifei", "feifei", "feifei", "katong"); //轉(zhuǎn)換為javaRDD JavaRDD<String> nameRDD = sc.parallelize(nameList, 3); //轉(zhuǎn)換為JavaRDD<Row> JavaRDD<Row> nameRowRDD = nameRDD.map(new Function<String, Row>() {public Row call(String name) throws Exception {return RowFactory.create(name); }}); List<StructField> fields = Lists.newArrayList(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame namesDF = sqlContext.createDataFrame(nameRowRDD, structType); namesDF.registerTempTable("names"); sqlContext.udf().register("countString", new Spark_UDAF()); sqlContext.sql("select name,countString(name) as count from names group by name").show(); List<Row> rows = sqlContext.sql("select name,countString(name) as count from names group by name").javaRDD().collect(); for (Row row : rows) {System.out.println(row); }} } 運(yùn)行結(jié)果:
+--------+-----+
|? ? name|count|
+--------+-----+
|? feifei|? ? 3|
|xiaoming|? ? 2|
|? ? ?劉德華|? ? 1|
|? katong|? ? 1|
|? ? ?古天樂|? ? 1|
+--------+-----+
2、統(tǒng)計(jì)某品牌價(jià)格的平均值
package com.bynear.spark_sql; import org.apache.spark.sql.Row; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; public class MyUDAF extends UserDefinedAggregateFunction {private StructType inputSchema; private StructType bufferSchema; public MyUDAF() {ArrayList<StructField> inputFields = new ArrayList<StructField>(); inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.DoubleType, true)); inputSchema = DataTypes.createStructType(inputFields); ArrayList<StructField> bufferFields = new ArrayList<StructField>(); bufferFields.add(DataTypes.createStructField("sum", DataTypes.DoubleType, true)); bufferFields.add(DataTypes.createStructField("count", DataTypes.DoubleType, true)); bufferSchema = DataTypes.createStructType(bufferFields); }@Override public StructType inputSchema() {return inputSchema; }@Override public StructType bufferSchema() {return bufferSchema; }@Override public DataType dataType() {return DataTypes.DoubleType; }@Override public boolean deterministic() {return true; }@Override public void initialize(MutableAggregationBuffer buffer) { // 緩存區(qū)兩個(gè)分組 分組編號(hào)為0 求和sum 初始化值為0 // 分組編號(hào)為1 求count 初始化值為0 buffer.update(0, 0.0); buffer.update(1, 0.0); }@Override public void update(MutableAggregationBuffer buffer, Row input) {//如果input的索引值為0的值不為0 if (!input.isNullAt(0)) { // 兩個(gè)分組分別進(jìn)行更新數(shù)據(jù)!分組編號(hào)0 求和sum 緩存區(qū)的值 + 輸入放入值 double updatesum = buffer.getDouble(0) + input.getDouble(0); // 分組編號(hào)1 求count 緩存區(qū)的個(gè)數(shù) + 1 double updatecount = buffer.getDouble(1) + 1; buffer.update(0, updatesum); buffer.update(1, updatecount); }}@Override public void merge(MutableAggregationBuffer buffer1, Row buffer2) {double metgesum = buffer1.getDouble(0) + buffer2.getDouble(0); double mergecount = buffer1.getDouble(1) + buffer2.getDouble(1); buffer1.update(0, metgesum); buffer1.update(1, mergecount); }@Override public Object evaluate(Row buffer) {return buffer.getDouble(0) / buffer.getDouble(1); } }package com.bynear.spark_sql; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.math.BigDecimal; import java.util.ArrayList; public class MyUDAF_SQL {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("myUDAF").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(jsc); JavaRDD<String> lines = jsc.textFile("C://Users//Administrator//Desktop//fastJSon//sales.txt"); JavaRDD<Row> map = lines.map(new Function<String, Row>() {@Override public Row call(String line) throws Exception {String[] Linesplit = line.split(","); return RowFactory.create(String.valueOf(Linesplit[0]), Double.valueOf(Linesplit[1])); }}); ArrayList<StructField> fields = new ArrayList<StructField>(); fields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("salary", DataTypes.DoubleType, true)); StructType structType = DataTypes.createStructType(fields); DataFrame df = sqlContext.createDataFrame(map, structType); sqlContext.udf().register("myAverage", new MyUDAF()); df.registerTempTable("zjs_table"); df.show(); sqlContext.udf().register("twoDecimal", new UDF1<Double, Double>() {@Override public Double call(Double in) throws Exception {BigDecimal b = new BigDecimal(in); double res = b.setScale(2, BigDecimal.ROUND_HALF_DOWN).doubleValue(); return res; }}, DataTypes.DoubleType); DataFrame resultDF = sqlContext.sql("select name,twoDecimal(myAverage(salary)) as 平均值 from zjs_table group by name "); resultDF.show(); } }
文本:
三星,1542
三星,1548
三星,8456
三星,8866
中興,1856
中興,1752
蘋果,1500
蘋果,2500
蘋果,3500
蘋果,4500
蘋果,5500
運(yùn)行結(jié)果:
+----+-------+
|name| salary|
+----+-------+
|? 三星|12345.0|
|? 三星| 4521.0|
|? 三星| 7895.0|
|? 華為| 5421.0|
|? 華為| 4521.0|
|? 華為| 5648.0|
|? 蘋果|12548.0|
|? 蘋果| 7856.0|
|? 蘋果|45217.0|
|? 蘋果|89654.0|
+----+-------+
+----+--------+
|name|? ? ?平均值|
+----+--------+
|? 三星| 8253.67|
|? 華為| 5196.67|
|? 蘋果|38818.75|
+----+--------+
注意點(diǎn):文本的編碼格式,以及Java代碼中DataTypes.DoubleType。。。。
總結(jié)
以上是生活随笔為你收集整理的SparkSQL中UDAF案例分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解放碑属于重庆哪个区 解放碑属于重庆什么
- 下一篇: linux cmake编译源码,linu