Hive自定义UDF UDAF UDTF
Hive是一種構建在Hadoop上的數據倉庫,Hive把SQL查詢轉換為一系列在Hadoop集群中運行的MapReduce作業,是MapReduce更高層次的抽象,不用編寫具體的MapReduce方法。Hive將數據組織為表,這就使得HDFS上的數據有了結構,元數據即表的模式,都存儲在名為metastore的數據庫中。
? ? ? 可以在hive的外殼環境中直接使用dfs訪問hadoop的文件系統命令。
? ? ? Hive可以允許用戶編寫自己定義的函數UDF,來在查詢中使用。Hive中有3種UDF:
? ? ? UDF: 操作單個數據行,產生單個數據行;
? ? ? UDAF: 操作多個數據行,產生一個數據行。
? ? ? UDTF: 操作一個數據行,產生多個數據行一個表作為輸出。
? ? ? 用戶構建的UDF使用過程如下:
? ? ? 第一步:繼承UDF或者UDAF或者UDTF,實現特定的方法。
? ? ? 第二步:將寫好的類打包為jar。如hivefirst.jar.
? ? ? 第三步:進入到Hive外殼環境中,利用add jar /home/hadoop/hivefirst.jar.注冊該jar文件
? ? ? 第四步:為該類起一個別名,create temporary function mylength as 'com.whut.StringLength';這里注意UDF只是為這個Hive會話臨時定義的。
? ? ? 第五步:在select中使用mylength();
?
?
自定義UDF
? ??1.繼承org.apache.hadoop.hive.ql.exec.UDF
? ? ? ?2.實現evaluate函數,evaluate函數支持重載
?package cn.sina.stat.hive.udf;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.UDF;
public final class SortFieldContent extends UDF {
public String evaluate( final String str, String delimiter) {
if (str == null ) {
return null ;
}
if (delimiter == null) {
delimiter = "," ;
}
String[] strs = str.split(delimiter);
Arrays. sort(strs);
String result = "" ;
for (int i = 0; i < strs. length; i++) {
if (result.length() > 0) {
result.concat(delimiter);
}
result.concat(strs[i]);
}
return result;
}
public String evaluate( final String str, String delimiter, String order) {
if (str == null ) {
return null ;
}
if (delimiter == null) {
delimiter = "," ;
}
if (order != null && order.toUpperCase().equals( "ASC" )) {
return evaluate(str, delimiter);
} else {
String[] strs = str.split(delimiter);
Arrays. sort(strs);
String result = "" ;
for (int i = strs. length - 1; i >= 0; i--) {
if (result.length() > 0) {
result.concat(delimiter);
}
result.concat(strs[i]);
}
return result;
}
}
}
注意事項:
? ?1,一個用戶UDF必須繼承org.apache.hadoop.hive.ql.exec.UDF;
? ?2,一個UDF必須要包含有evaluate()方法,但是該方法并不存在于UDF中。evaluate的參數個數以及類型都是用戶自己定義的。在使用的時候,Hive會調用UDF的evaluate()方法。
?
自定義UDAF
1.函數類繼承org.apache.hadoop.hive.ql.exec.UDAF
? ?內部類實現接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator
2.Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函數
? ?具體執行過程如圖:
?package cn.sina.stat.hive.udaf;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
public class ConcatClumnGroupByKeyWithOrder extends UDAF {
public static class ConcatUDAFEvaluator implements UDAFEvaluator {
public static class PartialResult {
String result;
String delimiter;
String order;
}
private PartialResult partial;
public void init() {
partial = null;
}
public boolean iterate(String value, String delimiter, String order) {
if (value == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
partial.result = new String("");
if (delimiter == null || delimiter.equals("")) {
partial.delimiter = new String(",");
} else {
partial.delimiter = new String(delimiter);
}
if (order != null
&& (order.toUpperCase().equals("ASC") || order
.toUpperCase().equals("DESC"))) {
partial.order = new String(order);
} else {
partial.order = new String("ASC");
}
}
if (partial.result.length() > 0) {
partial.result = partial.result.concat(partial.delimiter);
}
partial.result = partial.result.concat(value);
return true;
}
public PartialResult terminatePartial() {
return partial;
}
public boolean merge(PartialResult other) {
if (other == null) {
return true;
}
if (partial == null) {
partial = new PartialResult();
partial.result = new String(other.result);
partial.delimiter = new String(other.delimiter);
partial.order = new String(other.order);
} else {
if (partial.result.length() > 0) {
partial.result = partial.result.concat(partial.delimiter);
}
partial.result = partial.result.concat(other.result);
}
return true;
}
public String terminate() {
String[] strs = partial.result.split(partial.delimiter);
Arrays.sort(strs);
String result = new String("");
if (partial.order.equals("DESC")) {
for (int i = strs.length - 1; i >= 0; i--) {
if (result.length() > 0) {
result.concat(partial.delimiter);
}
result.concat(strs[i]);
}
} else {
for (int i = 0; i < strs.length; i++) {
if (result.length() > 0) {
result.concat(partial.delimiter);
}
result.concat(strs[i]);
}
}
return new String(result);
}
}
}
注意事項:
? ? 1,用戶的UDAF必須繼承了org.apache.hadoop.hive.ql.exec.UDAF;
? ? 2,用戶的UDAF必須包含至少一個實現了org.apache.hadoop.hive.ql.exec的靜態類,諸如常見的實現了 UDAFEvaluator。
? ? 3,一個計算函數必須實現的5個方法的具體含義如下:
? ? init():主要是負責初始化計算函數并且重設其內部狀態,一般就是重設其內部字段。一般在靜態類中定義一個內部字段來存放最終的結果。
? ? iterate():每一次對一個新值進行聚集計算時候都會調用該方法,計算函數會根據聚集計算結果更新內部狀態。當輸入值合法或者正確計算了,則就返回true。
? ? terminatePartial():Hive需要部分聚集結果的時候會調用該方法,必須要返回一個封裝了聚集計算當前狀態的對象。
? ? merge():Hive進行合并一個部分聚集和另一個部分聚集的時候會調用該方法。
? ? terminate():Hive最終聚集結果的時候就會調用該方法。計算函數需要把狀態作為一個值返回給用戶。
? ? 4,部分聚集結果的數據類型和最終結果的數據類型可以不同。
?
自定義UDTF
1.繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
2.實現initialize, process, close三個方法
???? a.initialize初始化驗證,返回字段名和字段類型
???? b.初始化完成后,調用process方法,對傳入的參數進行處理,通過forword()方法把結果返回
???? c.最后調用close()方法進行清理工作
package cn.sina.stat.hive.udtf;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
public class SortFieldExplodeToPair extends GenericUDTF {
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
@Override
public StructObjectInspector initialize(ObjectInspector[] args)
throws UDFArgumentException {
if (args.length != 3) {
throw new UDFArgumentLengthException(
"SortFieldExplodeToPair takes only three argument");
}
if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(
"SortFieldExplodeToPair takes string as first parameter");
}
if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(
"SortFieldExplodeToPair takes string as second parameter");
}
if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentException(
"SortFieldExplodeToPair takes string as third parameter");
}
if (args[2] == null
|| !(args[2].toString().toUpperCase().equals("ASC") || args[2]
.toString().toUpperCase().equals("DESC"))) {
throw new UDFArgumentException(
"SortFieldExplodeToPair third parameter must be \"ASC\" or \"DESC\"");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
fieldNames.add("col1");
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
return ObjectInspectorFactory.getStandardStructObjectInspector(
fieldNames, fieldOIs);
}
private final String[] forwardStr = new String[1];
@Override
public void process(Object[] args) throws HiveException {
String input = args[0].toString();
String delimiter = args[1].toString();
String order = args[2].toString();
String[] strList = input.split(delimiter);
Arrays.sort(strList);
if (strList.length > 1) {
if (order.toUpperCase().equals("DESC")) {
for (int i = strList.length - 1; i > 0; i--) {
forwardStr[0] = strList[i].concat(delimiter).concat(
strList[i - 1]);
forward(forwardStr);
}
} else {
for (int i = 0; i < strList.length - 1; i++) {
forwardStr[0] = strList[i].concat(delimiter).concat(
strList[i + 1]);
forward(forwardStr);
}
}
} else {
forward(strList);
}
}
總結
以上是生活随笔為你收集整理的Hive自定义UDF UDAF UDTF的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深入浅出学Hive:Hive体系结构
- 下一篇: hive大数据倾斜总结