5、Hive的自定义UDF函数
生活随笔
收集整理的這篇文章主要介紹了
5、Hive的自定义UDF函数
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
2019獨角獸企業重金招聘Python工程師標準>>>
1、pom.xml引入依賴及打包
<dependencies><dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.1.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency> </dependencies><build><plugins><!-- 配置java插件,指定版本 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><encoding>UTF-8</encoding><source>1.8</source><target>1.8</target><showWarnings>true</showWarnings></configuration></plugin></plugins> </build>2、對單個字段,或者多個字段進行處理
import utils.CommonUtils; import org.apache.hadoop.hive.ql.exec.UDF;/*** [@Author](https://my.oschina.net/arthor) liufu* @CreateTime 2017/5/4 14:13* @Descrition*/ public class AllTracksUDF extends UDF {// 重載方法// 處理Int類型字段// 及時要插入的表中字段為int、bigint類型等,都可以用string類型插入進去// int類型數據,在傳入參數的時候直接傳遞數字即可,比如:evaluate(power, 1)public Integer evaluate(String column, int columnType) {String longValue = getStringValue(column);if(longValue != null){return Integer.parseInt(longValue);}return null;}// 處理Long類型字段,包括時間// long類型參數,傳遞columnType的時候要加上"L", 比如:evaluate(startTime, 1L)public Long evaluate(String column, long columnType) {String longValue = getStringValue(column);if(longValue != null){// 1表示是時間,而時間為秒,要轉化為毫秒,*1000if(columnType == 1){return Long.parseLong(longValue) * 1000;}return Long.parseLong(longValue);}return null;}// 處理String類型字段public String evaluate(String column) {return getStringValue(column);}// 處理兩個字段,比如xpoint 和 ypoing的轉換,判空和拼接public String evaluate(String column1, String column2) {return convertLatLon(column1, column2);}/*** [@param](https://my.oschina.net/u/2303379) value* [@return](https://my.oschina.net/u/556800)* 獲取string類型的字段,判空處理*/private String getStringValue(String value) {if (value != null && !"MULL".equalsIgnoreCase(value) && !"NULL".equalsIgnoreCase(value) && value.trim().length() != 0) {return value;}return null;}/*** @param lat* @param lon* @return* 將經度、維度拼接*/private String convertLatLon(String lat, String lon) {if (lat == null | lon == null || "MULL".equalsIgnoreCase(lat) || "MULL".equalsIgnoreCase(lon) || "NULL".equalsIgnoreCase(lat) || "NULL".equalsIgnoreCase(lon) || "0".equalsIgnoreCase(lat) || "0".equalsIgnoreCase(lon)) {return "0,0";}// 經緯度轉換if (CommonUtils.parseDouble(lat) > CommonUtils.parseDouble(lon)) {return lon + "," + lat;} else {return lat + "," + lon;}} }3、利用map函數,將一條數據組裝成Map,然后傳遞進來
/*** 讀取hive的數據,然后將每條數據組合成一個json字符串,通過下面udf函數方法發送到kafka* <p>* 通過測試驗證,Hive2KafkaUDF類在每次mr任務中,只會創建一次,所以producer可以做成單例** @Author liufu* @E-mail: 1151224929@qq.com* @CreateTime 2019/6/5 18:06*/ @Description(name = "hive2kafka", value = "_FUNC_(string, topic, map<string,string>) - Return ret ") public class Hive2KafkaUDF extends UDF {private static Gson gson = new GsonBuilder().serializeNulls().create();private KafkaProducer<String, String> producer;public boolean evaluate(String kafkaParams, String topic, Map<String, String> dataMap) {KafkaProducer producerTemp = getProducer(kafkaParams);producerTemp.send(new ProducerRecord(topic, null, gson.toJson(dataMap)));return true;}private KafkaProducer getProducer(String kafkaParams) {if (producer == null) {synchronized ("getProducer") {if (producer == null) {Properties props = gson.fromJson(kafkaParams, Properties.class);producer = new KafkaProducer<>(props);}}}return producer;} }-
3.2、 如何使用這個UDF
利用map函數將數據組裝成一個Map對象select hive2kafka("{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 'together001', // map函數,左邊的name是最終的字段值,功能等同于username as namemap('name',username,'age',age)) from qwrenzixing.visual_deduction_kinship_relation
4、創建臨時函數
-
4.1、打包成jar包,可以放在任何能夠訪問到的地方,比如hdfs://,本地文件系統file://
-
4.2、加載jar
hive> add jar /root/hive2kafka.udf-1.0.jar;Added [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar] to class pathAdded resources: [/root/elasticsearce-hadoop/hive2kafka.udf-1.0.jar]hive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF';hive> create temporary function allTracksudf as 'com.study.AllTracksUDF';或者直接使用遠端jar來創建,不一定需要先add jarhive> create temporary function hive2kafka as 'com.study.Hive2KafkaUDF' using jar 'hdfs://rsb:8082/udf/hive2es.udf-1.0.jar'
5、使用臨時函數
-
5.1、第一個函數
select allTracksudf(create_time, 1L) as create_time from t_a;
-
5.2、第二個函數
利用map函數將數據組裝成一個Map對象select hive2kafka("{'bootstrap.servers': 'gawh243:9092', 'acks': 'all', 'key.serializer': 'org.apache.kafka.common.serialization.StringSerializer', 'value.serializer': 'org.apache.kafka.common.serialization.StringSerializer'}", 'together001', // map函數,左邊的name是最終的字段值,功能等同于username as namemap('name',username,'age',age)) from testDb.t_b;
轉載于:https://my.oschina.net/liufukin/blog/798533
總結
以上是生活随笔為你收集整理的5、Hive的自定义UDF函数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nginx log_format详解
- 下一篇: 计算机教室电脑无法启动,电子教室教师端不