日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hive自定义UDF UDAF UDTF

發布時間:2024/1/17 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hive自定义UDF UDAF UDTF 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Hive是一種構建在Hadoop上的數據倉庫,Hive把SQL查詢轉換為一系列在Hadoop集群中運行的MapReduce作業,是MapReduce更高層次的抽象,不用編寫具體的MapReduce方法。Hive將數據組織為表,這就使得HDFS上的數據有了結構,元數據即表的模式,都存儲在名為metastore的數據庫中。

? ? ? 可以在hive的外殼環境中直接使用dfs訪問hadoop的文件系統命令。

? ? ? Hive可以允許用戶編寫自己定義的函數UDF,來在查詢中使用。Hive中有3種UDF:

? ? ? UDF: 操作單個數據行,產生單個數據行;

? ? ? UDAF: 操作多個數據行,產生一個數據行。

? ? ? UDTF: 操作一個數據行,產生多個數據行一個表作為輸出。

? ? ? 用戶構建的UDF使用過程如下:

? ? ? 第一步:繼承UDF或者UDAF或者UDTF,實現特定的方法。

? ? ? 第二步:將寫好的類打包為jar。如hivefirst.jar.

? ? ? 第三步:進入到Hive外殼環境中,利用add jar /home/hadoop/hivefirst.jar.注冊該jar文件

? ? ? 第四步:為該類起一個別名,create temporary function mylength as 'com.whut.StringLength';這里注意UDF只是為這個Hive會話臨時定義的。

? ? ? 第五步:在select中使用mylength();

?

?

自定義UDF

? ??1.繼承org.apache.hadoop.hive.ql.exec.UDF

? ? ? ?2.實現evaluate函數,evaluate函數支持重載

?
  • package cn.sina.stat.hive.udf;

  • import java.util.Arrays;

  • import org.apache.hadoop.hive.ql.exec.UDF;

  • public final class SortFieldContent extends UDF {

  • public String evaluate( final String str, String delimiter) {

  • if (str == null ) {

  • return null ;

  • }

  • if (delimiter == null) {

  • delimiter = "," ;

  • }

  • String[] strs = str.split(delimiter);

  • Arrays. sort(strs);

  • String result = "" ;

  • for (int i = 0; i < strs. length; i++) {

  • if (result.length() > 0) {

  • result.concat(delimiter);

  • }

  • result.concat(strs[i]);

  • }

  • return result;

  • }

  • ?
  • public String evaluate( final String str, String delimiter, String order) {

  • if (str == null ) {

  • return null ;

  • }

  • if (delimiter == null) {

  • delimiter = "," ;

  • }

  • if (order != null && order.toUpperCase().equals( "ASC" )) {

  • return evaluate(str, delimiter);

  • } else {

  • String[] strs = str.split(delimiter);

  • Arrays. sort(strs);

  • String result = "" ;

  • for (int i = strs. length - 1; i >= 0; i--) {

  • if (result.length() > 0) {

  • result.concat(delimiter);

  • }

  • result.concat(strs[i]);

  • }

  • return result;

  • }

  • }

  • }

  • 注意事項:

    ? ?1,一個用戶UDF必須繼承org.apache.hadoop.hive.ql.exec.UDF;

    ? ?2,一個UDF必須要包含有evaluate()方法,但是該方法并不存在于UDF中。evaluate的參數個數以及類型都是用戶自己定義的。在使用的時候,Hive會調用UDF的evaluate()方法。

    ?

    自定義UDAF

    1.函數類繼承org.apache.hadoop.hive.ql.exec.UDAF

    ? ?內部類實現接口org.apache.hadoop.hive.ql.exec.UDAFEvaluator

    2.Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函數

    ? ?具體執行過程如圖:

    ?
  • package cn.sina.stat.hive.udaf;

  • import java.util.Arrays;

  • import org.apache.hadoop.hive.ql.exec.UDAF;

  • import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

  • ?
  • public class ConcatClumnGroupByKeyWithOrder extends UDAF {

  • public static class ConcatUDAFEvaluator implements UDAFEvaluator {

  • public static class PartialResult {

  • String result;

  • String delimiter;

  • String order;

  • }

  • ?
  • private PartialResult partial;

  • ?
  • public void init() {

  • partial = null;

  • }

  • ?
  • public boolean iterate(String value, String delimiter, String order) {

  • ?
  • if (value == null) {

  • return true;

  • }

  • if (partial == null) {

  • partial = new PartialResult();

  • partial.result = new String("");

  • if (delimiter == null || delimiter.equals("")) {

  • partial.delimiter = new String(",");

  • } else {

  • partial.delimiter = new String(delimiter);

  • }

  • if (order != null

  • && (order.toUpperCase().equals("ASC") || order

  • .toUpperCase().equals("DESC"))) {

  • partial.order = new String(order);

  • } else {

  • partial.order = new String("ASC");

  • }

  • ?
  • }

  • if (partial.result.length() > 0) {

  • partial.result = partial.result.concat(partial.delimiter);

  • }

  • ?
  • partial.result = partial.result.concat(value);

  • ?
  • return true;

  • }

  • ?
  • public PartialResult terminatePartial() {

  • return partial;

  • }

  • ?
  • public boolean merge(PartialResult other) {

  • if (other == null) {

  • return true;

  • }

  • if (partial == null) {

  • partial = new PartialResult();

  • partial.result = new String(other.result);

  • partial.delimiter = new String(other.delimiter);

  • partial.order = new String(other.order);

  • } else {

  • if (partial.result.length() > 0) {

  • partial.result = partial.result.concat(partial.delimiter);

  • }

  • partial.result = partial.result.concat(other.result);

  • }

  • return true;

  • }

  • ?
  • public String terminate() {

  • String[] strs = partial.result.split(partial.delimiter);

  • Arrays.sort(strs);

  • String result = new String("");

  • if (partial.order.equals("DESC")) {

  • for (int i = strs.length - 1; i >= 0; i--) {

  • if (result.length() > 0) {

  • result.concat(partial.delimiter);

  • }

  • result.concat(strs[i]);

  • }

  • } else {

  • for (int i = 0; i < strs.length; i++) {

  • if (result.length() > 0) {

  • result.concat(partial.delimiter);

  • }

  • result.concat(strs[i]);

  • }

  • }

  • return new String(result);

  • }

  • }

  • }

  • 注意事項:

    ? ? 1,用戶的UDAF必須繼承了org.apache.hadoop.hive.ql.exec.UDAF;

    ? ? 2,用戶的UDAF必須包含至少一個實現了org.apache.hadoop.hive.ql.exec的靜態類,諸如常見的實現了 UDAFEvaluator。

    ? ? 3,一個計算函數必須實現的5個方法的具體含義如下:

    ? ? init():主要是負責初始化計算函數并且重設其內部狀態,一般就是重設其內部字段。一般在靜態類中定義一個內部字段來存放最終的結果。

    ? ? iterate():每一次對一個新值進行聚集計算時候都會調用該方法,計算函數會根據聚集計算結果更新內部狀態。當輸入值合法或者正確計算了,則就返回true。

    ? ? terminatePartial():Hive需要部分聚集結果的時候會調用該方法,必須要返回一個封裝了聚集計算當前狀態的對象。

    ? ? merge():Hive進行合并一個部分聚集和另一個部分聚集的時候會調用該方法。

    ? ? terminate():Hive最終聚集結果的時候就會調用該方法。計算函數需要把狀態作為一個值返回給用戶。

    ? ? 4,部分聚集結果的數據類型和最終結果的數據類型可以不同。

    ?

    自定義UDTF

    1.繼承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
    2.實現initialize, process, close三個方法
    ???? a.initialize初始化驗證,返回字段名和字段類型
    ???? b.初始化完成后,調用process方法,對傳入的參數進行處理,通過forword()方法把結果返回
    ???? c.最后調用close()方法進行清理工作

    ?
  • package cn.sina.stat.hive.udtf;

  • import java.util.ArrayList;

  • import java.util.Arrays;

  • import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

  • import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

  • import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;

  • import org.apache.hadoop.hive.ql.metadata.HiveException;

  • import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

  • import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

  • import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

  • import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

  • ?
  • public class SortFieldExplodeToPair extends GenericUDTF {

  • ?
  • @Override

  • public void close() throws HiveException {

  • // TODO Auto-generated method stub

  • }

  • ?
  • @Override

  • public StructObjectInspector initialize(ObjectInspector[] args)

  • throws UDFArgumentException {

  • if (args.length != 3) {

  • throw new UDFArgumentLengthException(

  • "SortFieldExplodeToPair takes only three argument");

  • }

  • if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {

  • throw new UDFArgumentException(

  • "SortFieldExplodeToPair takes string as first parameter");

  • }

  • if (args[1].getCategory() != ObjectInspector.Category.PRIMITIVE) {

  • throw new UDFArgumentException(

  • "SortFieldExplodeToPair takes string as second parameter");

  • }

  • if (args[2].getCategory() != ObjectInspector.Category.PRIMITIVE) {

  • throw new UDFArgumentException(

  • "SortFieldExplodeToPair takes string as third parameter");

  • }

  • if (args[2] == null

  • || !(args[2].toString().toUpperCase().equals("ASC") || args[2]

  • .toString().toUpperCase().equals("DESC"))) {

  • throw new UDFArgumentException(

  • "SortFieldExplodeToPair third parameter must be \"ASC\" or \"DESC\"");

  • }

  • ?
  • ArrayList<String> fieldNames = new ArrayList<String>();

  • ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

  • fieldNames.add("col1");

  • fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

  • ?
  • return ObjectInspectorFactory.getStandardStructObjectInspector(

  • fieldNames, fieldOIs);

  • }

  • ?
  • private final String[] forwardStr = new String[1];

  • ?
  • @Override

  • public void process(Object[] args) throws HiveException {

  • String input = args[0].toString();

  • String delimiter = args[1].toString();

  • String order = args[2].toString();

  • String[] strList = input.split(delimiter);

  • Arrays.sort(strList);

  • if (strList.length > 1) {

  • if (order.toUpperCase().equals("DESC")) {

  • for (int i = strList.length - 1; i > 0; i--) {

  • forwardStr[0] = strList[i].concat(delimiter).concat(

  • strList[i - 1]);

  • forward(forwardStr);

  • }

  • } else {

  • for (int i = 0; i < strList.length - 1; i++) {

  • forwardStr[0] = strList[i].concat(delimiter).concat(

  • strList[i + 1]);

  • forward(forwardStr);

  • }

  • }

  • } else {

  • forward(strList);

  • }

  • }

  • ?
  • 總結

    以上是生活随笔為你收集整理的Hive自定义UDF UDAF UDTF的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。