UDF、UDAF、UDTF函数编写
生活随笔
收集整理的這篇文章主要介紹了
UDF、UDAF、UDTF函数编写
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、UDF函數編寫
1.步驟
1.繼承UDF類 2.重寫evalute方法?
1、繼承GenericUDF 2、實現initialize、evaluate、getDisplayString方法?
2.案例
實現lower函數:
package com.xxx.udf;import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hadoop.io.Text;public class LowerUDF extends UDF {public Text evaluate(Text input){if(null == input){return null;}String inputValue = input.toString().trim() ;if(null == inputValue){return null ;}return new Text(inputValue.toLowerCase()) ;} }?
package com.xxx.udf;import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.io.Text;public class LowerUDF extends GenericUDF {StringObjectInspector str ;@Overridepublic ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {//判斷輸入參數個數是否合法if (arguments.length != 1) {throw new UDFArgumentLengthException("輸入參數長度不合法,應該為一個參數");}//判斷輸入參數類型是否合法if (!(arguments[0] instanceof StringObjectInspector)) {throw new UDFArgumentException("輸入非法參數,應為字符串類型");}str=(StringObjectInspector)arguments[0];//確定返回值類型return PrimitiveObjectInspectorFactory.javaStringObjectInspector;}@Overridepublic Object evaluate(DeferredObject[] arguments) throws HiveException {String input = str.getPrimitiveJavaObject(arguments[0].get());return new Text(input.toLowerCase());}@Overridepublic String getDisplayString(String[] children) {return "方法的描述信息";} }?
3.打成jar包上傳
mvn clean package
4.在hive中創建臨時函數
add jar /home/xxx/yf/to_lower.jar; create temporary function to_lower as 'com.xxx.udf.LowerUDF'; select to_lower("DSJIFASD") from dual; drop temporary function comparestringbysplit;?
?
二、UDAF函數編寫
1.步驟
1、繼承AbstractGenericUDAFResolver 2、繼承GenericUDAFEvaluator 3、Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函數 init初始化iterate函數處理讀入的行數據terminatePartial返回iterate處理的中間結果merge合并上述處理結果terminate返回最終值?
2.案例
實現avg
package com.xxx.udf;import org.apache.hadoop.hive.ql.exec.UDAF; import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;public class Avg extends UDAF {public static class AvgState {private long mCount;private double mSum;}public static class AvgEvaluator implements UDAFEvaluator {AvgState state;public AvgEvaluator() {super();state = new AvgState();init();}/*** init函數類似于構造函數,用于UDAF的初始化*/public void init() {state.mSum = 0;state.mCount = 0;}/*** iterate接收傳入的參數,并進行內部的輪轉。其返回類型為boolean * * @param o * @return*/public boolean iterate(Double o) {if (o != null) {state.mSum += o;state.mCount++;}return true;}/*** terminatePartial無參數,其為iterate函數遍歷結束后,返回輪轉數據, * terminatePartial類似于hadoop的Combiner * * @return*/public AvgState terminatePartial() {// combinerreturn state.mCount == 0 ? null : state;}/*** merge接收terminatePartial的返回結果,進行數據merge操作,其返回類型為boolean * * @param o * @return*/public boolean merge(AvgState avgState) {if (avgState != null) {state.mCount += avgState.mCount;state.mSum += avgState.mSum;}return true;}/*** terminate返回最終的聚集函數結果 * * @return*/public Double terminate() {return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount);}} }?實現sum
package com.xxx.udf;import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver; import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable;public class Test extends AbstractGenericUDAFResolver {/*** 獲取處理邏輯類* @param info* @return* @throws SemanticException*/@Overridepublic GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {//判斷輸入參數是否合法,參數個數,參數類型if (info.length != 1) {throw new UDFArgumentLengthException("輸入參數個數非法,一個參數");}return new GenericEvaluate();}//處理邏輯類public static class GenericEvaluate extends GenericUDAFEvaluator {private PrimitiveObjectInspector input;private DoubleWritable result ; //保存最終結果private MyAggregationBuffer myAggregationBuffer; //自定義聚合列,保存臨時結果//自定義AggregationBufferpublic static class MyAggregationBuffer implements AggregationBuffer {Double sum;}@Override //指定返回類型public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {super.init(m, parameters);result = new DoubleWritable(0);input = (PrimitiveObjectInspector) parameters[0];// 指定返回結果類型return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;}@Override //獲得一個聚合的緩沖對象,每個map執行一次public AggregationBuffer getNewAggregationBuffer() throws HiveException {MyAggregationBuffer myAggregationBuffer = new MyAggregationBuffer();reset(myAggregationBuffer); // 重置聚合值return myAggregationBuffer;}@Overridepublic void reset(AggregationBuffer agg) throws HiveException {MyAggregationBuffer newAgg = (MyAggregationBuffer) agg;newAgg.sum = 0.0;}@Override // 傳入參數值聚合public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {MyAggregationBuffer myAgg = (MyAggregationBuffer) agg;double inputNum = PrimitiveObjectInspectorUtils.getDouble(parameters[0], input);myAgg.sum += inputNum;}@Override //public Object terminatePartial(AggregationBuffer agg) throws HiveException {MyAggregationBuffer newAgg = (MyAggregationBuffer) agg;result.set(newAgg.sum);return result;}@Override // 合并 public void merge(AggregationBuffer agg, Object partial) throws HiveException {double inputNum = PrimitiveObjectInspectorUtils.getDouble(partial, input);MyAggregationBuffer newAgg = (MyAggregationBuffer) agg;newAgg.sum += inputNum;}@Override //輸出最終結果public Object terminate(AggregationBuffer agg) throws HiveException {MyAggregationBuffer aggregationBuffer = (MyAggregationBuffer) agg;result.set(aggregationBuffer.sum);return result;}} }?
3.打包
mvn clean package
4.創建臨時函數
add jar /home/xxx/yf/my_avg.jar; create temporary function my_avg as 'com.xxx.udf.UDTFExplode'; select my_avg() from dual; drop temporary function my_avg;?
三、UDTF函數編寫
1.步驟
1.繼承GenericUDTF 2.重寫initialize、process方法initialize初始化校驗參數是否正確、process處理返回結果、forward將結果返回?
2.案例
將字符串按照元素索引分別輸出,如:‘a,c,b’? ?-- > a,1? ? c,2? b,3
package com.suning.udf;import java.util.ArrayList;import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;public class UDTFExplode extends GenericUDTF {@Overridepublic void close() throws HiveException {// TODO Auto-generated method stub}@Overridepublic StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {if (args.length != 1) {throw new UDFArgumentLengthException("ExplodeMap takes only one argument");}if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {throw new UDFArgumentException("ExplodeMap takes string as a parameter");}ArrayList<String> fieldNames = new ArrayList<String>();ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();fieldNames.add("col1");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);fieldNames.add("col2");fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);}@Overridepublic void process(Object[] args) throws HiveException {// TODO Auto-generated method stubString input = args[0].toString();String[] test = input.split(",");for (int i = 0; i < test.length; i++) {try {String[] result = (test[i]+":"+String.valueOf(i+1)).split(":");forward(result);} catch (Exception e) {continue;}}} }?
3.打包
mvn clean package
4.創建臨時函數
add jar /home/xxx/yf/str_index.jar; create temporary function str_index as 'com.xxx.udf.UDTFExplode'; select str_index("a,c,b") from dual; drop temporary function str_index;?
?
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>spark-hive</groupId><artifactId>spark-hive</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><scala.version>2.11.8</scala.version><spark.version>2.1.0.9</spark.version><spark.artifactId.version>2.11</spark.artifactId.version></properties><dependencies><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.1.1</version><type>jar</type></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-lang3</artifactId><version>3.1</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.21</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.29</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${spark.artifactId.version}</artifactId><version>${spark.version}</version><scope>provided</scope></dependency><!--flink dependency--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.5.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>1.5.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>1.5.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-wikiedits_2.11</artifactId><version>1.5.0</version></dependency><!--hbase dependency--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase</artifactId><version>0.98.8-hadoop2</version><type>pom</type></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>0.98.8-hadoop2</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>0.98.8-hadoop2</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>0.98.8-hadoop2</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin><plugin><groupId>org.codehaus.mojo</groupId><artifactId>build-helper-maven-plugin</artifactId><version>1.8</version><executions><execution><id>add-source</id><phase>generate-sources</phase><goals><goal>add-source</goal></goals><configuration><sources><source>src/main/scala</source><source>src/test/scala</source></sources></configuration></execution><execution><id>add-test-source</id><phase>generate-sources</phase><goals><goal>add-test-source</goal></goals><configuration><sources><source>src/test/scala</source></sources></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.7</source><target>1.7</target><encoding>${project.build.sourceEncoding}</encoding></configuration></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><executions><execution><goals><goal>compile</goal><goal>add-source</goal><goal>testCompile</goal></goals></execution></executions><configuration><scalaVersion>2.11.8</scalaVersion><sourceDir>src/main/scala</sourceDir><jvmArgs><jvmArg>-Xms64m</jvmArg><jvmArg>-Xmx1024m</jvmArg></jvmArgs></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-release-plugin</artifactId><version>2.5.3</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-deploy-plugin</artifactId><configuration><skip>false</skip></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude>org.apache.hive <exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><minimizeJar>false</minimizeJar></configuration></execution></executions></plugin></plugins><resources><resource><directory>src/main/resources</directory><filtering>true</filtering></resource><resource><directory>src/main/resources/${profiles.active}</directory></resource></resources><!-- 修復 Plugin execution not covered by lifecycle configuration --><pluginManagement><plugins><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.codehaus.mojo</groupId><artifactId>build-helper-maven-plugin</artifactId><versionRange>[1.8,)</versionRange><goals><goal>add-source</goal><goal>add-test-source</goal></goals></pluginExecutionFilter><action><ignore></ignore></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><versionRange>[1.8,)</versionRange><goals><goal>compile</goal><goal>add-source</goal><goal>testCompile</goal></goals></pluginExecutionFilter><action><ignore></ignore></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build> </project> View Code?
轉載于:https://www.cnblogs.com/yin-fei/p/10748527.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的UDF、UDAF、UDTF函数编写的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [HNOI2007]分裂游戏
- 下一篇: Servlet,GenericServl