日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构

發(fā)布時間:2025/3/21 编程问答 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

數據倉庫VS數據庫

數據倉庫的定義:

數據倉庫是將多個數據源的數據經過ETL(Extract(抽取)、Transform(轉換)、Load(加載))理之后,按照一定的主題集成起來提供決策支持和聯機分析應用的結構化數據環(huán)境

數據倉庫VS數據庫:

數據庫是面向事務的設計,數據倉庫是面向主題設計的

數據庫一般存儲在線交易數據,數據倉庫存儲的一般是歷史數據

數據庫設計是避免冗余,采用三范式的規(guī)則來設計,數據倉庫在設計是有意引入冗余,采用反范式的方式來設計

OLTP VS OLAP:

聯機事務處理OLTP是傳統的關系型數據庫的主要應用,主要是基本的、日常的事務處理,例如銀行交易

聯機分析處理OLAP是數據倉庫系統的主要應用,支持復雜的分析操作,側重決策支持,并且提供直觀易懂的查詢結果

常規(guī)的數倉架構:

為什么建設數據倉庫:

各個業(yè)務數據存在不一致,數據關系混亂

業(yè)務系統一般針對于OLTP,而數據倉庫可以實現OLAP分析

數據倉庫是多源的復雜環(huán)境,可以對多個業(yè)務的數據進行統一分析

數據倉庫建設目標:

集成多源數據,數據來源和去向可追溯,梳理血緣關系

減少重復開發(fā),保存通用型中間數據,避免重復計算

屏蔽底層業(yè)務邏輯,對外提供一致的、 結構清晰的數據

如何實現:

實現通用型數據ETL工具

根據業(yè)務建立合理的數據分層模型

數據倉庫分層建設

數倉建設背景:

數據建設剛起步,大部分數據經過粗暴的數據接入后直接對接業(yè)務

數據建設發(fā)展到一定階段,發(fā)現數據的使用雜亂無章,各種業(yè)務都是從原始數據直接計算而得。

各種重復計算,嚴重浪費了計算資源,需要優(yōu)化性能

為什么進行數倉分層:

清晰數據結構:每個數據分層都有對應的作用域

數據血緣追蹤:對各層之間的數據表轉換進行跟蹤,建立血緣關系

減少重復開發(fā):規(guī)范數據分層,開發(fā)通用的中間層數據

屏蔽原始數據的異常:通過數據分層管控數據質量

屏蔽業(yè)務的影響:不必改一次業(yè)務就需要重新接入數據

復雜問題簡單化:將復雜的數倉架構分解成多個數據層來完成

常見的分層含義:

STG層

原始數據層:存儲原始數據,數據結構與采集數據一致

存儲周期:保存全部數據

表命名規(guī)范:stg_主題_表內容_分表規(guī)則

ODS層

數據操作層:對STG層數據進行初步處理,如去除臟數據,去除無用字段.

存儲周期:默認保留近30天數據

表命名規(guī)范:ods_主題_表內容_分表規(guī)則

DWD層

數據明細層:數據處理后的寬表,目標為滿足80%的業(yè)務需求

存儲周期:保留歷史至今所有的數據

表命名規(guī)范:dwd_業(yè)務描述時間粒度

DWS層

數據匯總層:匯總數據,解決數據匯總計算和數據完整度問題

存儲周期:保留歷史至今所有的數據

表命名規(guī)范:dws_業(yè)務描述_時間粒度_sum

DIM層

公共維度層:存儲公共的信息數據,用于DWD、DWS的數據關聯

存儲周期:按需存儲,一般保留歷史至今所有的數據

表命名規(guī)范:dim_維度描述

DM層

數據集市層:用于BI、多維分析、標簽、數據挖掘等

存儲周期:按需存儲,--般保留歷史至今所有的數據

表命名規(guī)范:dm_主題_表內容_分表規(guī)則

分層之間的數據流轉:

Hive是什么

Hive簡介:

Hive是基于Hadoop的數據倉庫工具,提供類SQL語法(HiveQL)

默認以MR作為計算引擎(也支持其他計算引擎,例如tez)、HDFS 作為存儲系統,提供超大數據集的計算/擴展能力

Hive是將數據映射成數據庫和一張張的表,庫和表的元數據信息一般存在關系型數據庫

Hive的簡單架構圖:

Hive VS Hadoop:

Hive數據存儲:Hive的數據是存儲在HDFS.上的,Hive的庫和表是對HDFS.上數據的映射

Hive元數據存儲:元數據存儲一般在外部關系庫( Mysql )與Presto Impala等共享

Hive語句的執(zhí)行過程:將HQL轉換為MapReduce任務運行

Hive與關系數據庫Mysql的區(qū)別

產品定位

Hive是數據倉庫,為海量數據的離線分析設計的,不支持OLTP(聯機事務處理所需的關鍵功能ACID,而更接近于OLAP(聯機分析技術)),適給離線處理大數據集。而MySQL是關系型數據庫,是為實時業(yè)務設計的。

可擴展性

Hive中的數據存儲在HDFS(Hadoop的分布式文件系統),metastore元數據一 般存儲在獨立的關系型數據庫中,而MySQL則是服務器本地的文件系統。因此Hive具有良好的可擴展性,數據庫由于ACID語義的嚴格限制,擴展性十分有限。

讀寫模式

Hive為讀時模式,數據的驗證則是在查詢時進行的,這有利于大數據集的導入,讀時模式使數據的加載非常迅速,數據的加載僅是文件復制或移動。MySQL為寫時模式,數據在寫入數據庫時對照模式檢查。寫時模式有利于提升查詢性能,因為數據庫可以對列進行索引。

數據更新

Hive是針對數據倉庫應用設計的,而數倉的內容是讀多寫少的,Hive中不支持對數據進行改寫,所有數據都是在加載的時候確定好的。而數據庫中的數據通常是需要經常進行修改的。

索引

Hive支持索引,但是Hive的索引與關系型數據庫中的索引并不相同,比如,Hive不支持主鍵或者外鍵。Hive提供了有限的索引功能,可以為-些字段建立索引,一張表的索引數據存儲在另外一張表中。由于數據的訪問延遲較高,Hive不適合在線數據查詢。數據庫在少星的特定條件的數據訪問中,索引可以提供較低的延遲。

計算模型

Hive默認使用的模型是MapReduce(也可以on spark、on tez),而MySQL使用的是自己設計的Executor計算模型

Hive安裝部署

參考:

Hive基本使用(上)Hive數據類型/分區(qū)/基礎語法

Hive數據類型:

基本數據類型:int、 float、 double、 string、 boolean、 bigint等

復雜類型:array、map、 struct

Hive分區(qū):

Hive將海量數據按某幾個字段進行分區(qū),查詢時不必加載全部數據

分區(qū)對應到HDFS就是HDFS的目錄.

分區(qū)分為靜態(tài)分區(qū)和動態(tài)分區(qū)兩種

Hive常用基礎語法:

USE DATABASE_NAME

CREATE DATABASE IF NOT EXISTS DB NAME

DESC DATABASE DB NAME

CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE

SELECT * FROM TABLE NAME

ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME

寫個Python腳本生成一些測試數據:

import json

import random

import uuid

name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')

hobby = ('reading', 'play', 'dancing', 'sing')

subject = ('math', 'chinese', 'english', 'computer')

data = []

for item in name:

scores = {key: random.randint(60, 100) for key in subject}

data.append("|".join([uuid.uuid4().hex, item, ','.join(

random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))

with open('test.csv', 'w') as f:

f.write('\n'.join(data))

執(zhí)行該腳本,生成測試數據文件:

[root@hadoop01 ~/py-script]# python3 gen_data.py

[root@hadoop01 ~/py-script]# ll -h

...

-rw-r--r--. 1 root root 745 11月 9 11:09 test.csv

[root@hadoop01 ~/py-script]#

我們可以看一下生成的數據:

[root@hadoop01 ~/py-script]# cat test.csv

f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77

...

數據以 | 符進行分割,前兩個字段都是string類型,第三個字段是array類型,第四個字段是map類型

創(chuàng)建測試用的數據庫:

0: jdbc:hive2://localhost:10000> create database hive_test;

No rows affected (0.051 seconds)

0: jdbc:hive2://localhost:10000> use hive_test;

No rows affected (0.06 seconds)

0: jdbc:hive2://localhost:10000>

創(chuàng)建測試表:

CREATE TABLE test(

user_id string,

user_name string,

hobby array,

scores map

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

COLLECTION ITEMS TERMINATED BY ','

MAP KEYS TERMINATED BY ':'

LINES TERMINATED BY '\n';

將本地數據加載到Hive中:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;

No rows affected (0.785 seconds)

0: jdbc:hive2://localhost:10000>

查詢數據:

Hive將HQL轉換為MapReduce的流程

了解了Hive中的SQL基本操作之后,我們來看看Hive是如何將SQL轉換為MapReduce任務的,整個轉換過程分為六個階段:

Antr定義SQL的語法規(guī)則,完成SQL詞法,語法解析,將SQL 轉化為抽象語法樹AST Tree

遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock

遍歷QueryBlock,翻譯為執(zhí)行操作樹OperatorTree

邏輯層優(yōu)化器進行OperatorTree變換,合并不必要的ReduceSinkOperator,減少shufle數據量

遍歷OperatorTree,翻譯為MapReduce任務

物理層優(yōu)化器進行MapReduce任務的變換,生成最終的執(zhí)行計劃

與普通SQL一樣,我們可以通過在HQL前面加上explain關鍵字查看HQL的執(zhí)行計劃:

explain select * from test where id > 10 limit 1000

Hive會將這條語句解析成一個個的Operator,Operator就是Hive解析之后的最小單元,每個Operator其實都是對應一個MapReduce任務。例如,上面這條語句被Hive解析后,就是由如下Operator組成:

同時,Hive實現了優(yōu)化器對這些Operator的順序進行優(yōu)化,幫助我們提升查詢效率。Hive中的優(yōu)化器主要分為三類:

RBO(Rule-Based Optimizer):基于規(guī)則的優(yōu)化器

CBO(Cost-Based Optimizer):基于代價的優(yōu)化器,這是默認的優(yōu)化器

動態(tài)CBO:在執(zhí)行計劃生成的過程中動態(tài)優(yōu)化的方式

Hive基本使用(中)內部表/外部表/分區(qū)表/分桶表

內部表:

和傳統數據庫的Table概念類似,對應HDFS上存儲目錄,刪除表時,刪除元數據和表數據。內部表的數據,會存放在HDFS中的特定的位置中,可以通過配置文件指定。當刪除表時,數據文件也會一并刪除。適用于臨時創(chuàng)建的中間表。

外部表:

指向已經存在的HDFS數據,刪除時只刪除元數據信息。適用于想要在Hive之外使用表的數據的情況,當你刪除External Table時,只是刪除了表的元數據,它的數據并沒有被刪除。適用于數據多部門共享。建表時使用create external table。指定external關鍵字即可。

分區(qū)表:

Partition對應普通數據庫對Partition列的密集索引,將數據按照Partition列存儲到不同目錄,便于并行分析,減少數據量。分區(qū)表創(chuàng)建表的時候需要指定分區(qū)字段。

分區(qū)字段與普通字段的區(qū)別:分區(qū)字段會在HDFS表目錄下生成一個分區(qū)字段名稱的目錄,而普通字段則不會,查詢的時候可以當成普通字段來使用,一般不直接和業(yè)務直接相關。

分桶表:

對數據進行hash,放到不同文件存儲,方便抽樣和join查詢。可以將內部表,外部表和分區(qū)表進一步組織成桶表,可以將表的列通過Hash算法進一步分解成不同的文件存儲。

對于內部表和外部表的概念和應用場景我們很容易理解,我們需要重點關注一下分區(qū)表和分桶表。 我們?yōu)槭裁匆⒎謪^(qū)表和分桶表呢?HQL通過where子句來限制條件提取數據,那么與其遍歷一張大表,不如將這張大表拆分成多個小表,并通過合適的索引來掃描表中的一小部分,分區(qū)和分桶都是采用了這種理念。

分區(qū)會創(chuàng)建物理目錄,并且可以具有子目錄(通常會按照時間、地區(qū)分區(qū)),目錄名以 分區(qū)名=值 形式命名,例如:create_time=202011。分區(qū)名會作為表中的偽列,這樣通過where字句中加入分區(qū)的限制可以在僅掃描對應子目錄下的數據。通過 partitioned by (feld1 type, ...) 創(chuàng)建分區(qū)列。

分桶可以繼續(xù)在分區(qū)的基礎上再劃分小表,分桶根據哈希值來確定數據的分布(即MapReducer中的分區(qū)),比如分區(qū)下的一部分數據可以根據分桶再分為多個桶,這樣在查詢時先計算對應列的哈希值并計算桶號,只需要掃描對應桶中的數據即可。分桶通過clustered by(field) into n buckets創(chuàng)建。

接下來簡單演示下這幾種表的操作,首先將上一小節(jié)生成的測試數據文件上傳到hdfs中:

[root@hadoop01 ~]# hdfs dfs -mkdir /test

[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test

[root@hadoop01 ~]# hdfs dfs -ls /test

Found 1 items

-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /test/test.csv

[root@hadoop01 ~]#

內部表

建表SQL:

CREATE TABLE test_table(

user_id string,

user_name string,

hobby array,

scores map

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

COLLECTION ITEMS TERMINATED BY ','

MAP KEYS TERMINATED BY ':'

LINES TERMINATED BY '\n';

將hdfs數據加載到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;

No rows affected (0.169 seconds)

0: jdbc:hive2://localhost:10000>

查看創(chuàng)建的表存儲在hdfs的哪個目錄下:

0: jdbc:hive2://localhost:10000> show create table test_table;

+----------------------------------------------------+

| createtab_stmt |

+----------------------------------------------------+

| CREATE TABLE `test_table`( |

| `user_id` string, |

| `user_name` string, |

| `hobby` array, |

| `scores` map) |

| ROW FORMAT SERDE |

| 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' |

| WITH SERDEPROPERTIES ( |

| 'collection.delim'=',', |

| 'field.delim'='|', |

| 'line.delim'='\n', |

| 'mapkey.delim'=':', |

| 'serialization.format'='|') |

| STORED AS INPUTFORMAT |

| 'org.apache.hadoop.mapred.TextInputFormat' |

| OUTPUTFORMAT |

| 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |

| LOCATION |

| 'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |

| TBLPROPERTIES ( |

| 'bucketing_version'='2', |

| 'transient_lastDdlTime'='1604893190') |

+----------------------------------------------------+

22 rows selected (0.115 seconds)

0: jdbc:hive2://localhost:10000>

在hdfs中可以查看到數據文件:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table

Found 1 items

-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv

[root@hadoop01 ~]#

刪除表:

0: jdbc:hive2://localhost:10000> drop table test_table;

No rows affected (0.107 seconds)

0: jdbc:hive2://localhost:10000>

查看hdfs會發(fā)現該表所對應的存儲目錄也一并被刪除了:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/

Found 2 items

drwxr-xr-x - root supergroup 0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table

drwxr-xr-x - root supergroup 0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test

[root@hadoop01 ~]#

外部表

建表SQL,與內部表的區(qū)別就在于external關鍵字:

CREATE external TABLE external_table(

user_id string,

user_name string,

hobby array,

scores map

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

COLLECTION ITEMS TERMINATED BY ','

MAP KEYS TERMINATED BY ':'

LINES TERMINATED BY '\n';

將數據文件加載到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;

No rows affected (0.182 seconds)

0: jdbc:hive2://localhost:10000>

此時會發(fā)現hdfs中的數據文件會被移動到hive的目錄下:

[root@hadoop01 ~]# hdfs dfs -ls /test

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table

Found 1 items

-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv

[root@hadoop01 ~]#

刪除表:

0: jdbc:hive2://localhost:10000> drop table external_table;

No rows affected (0.112 seconds)

0: jdbc:hive2://localhost:10000>

查看hdfs會發(fā)現該表所對應的存儲目錄仍然存在:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table

Found 1 items

-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv

[root@hadoop01 ~]#

分區(qū)表

建表語句:

CREATE TABLE partition_table(

user_id string,

user_name string,

hobby array,

scores map

)

PARTITIONED BY (create_time string)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

COLLECTION ITEMS TERMINATED BY ','

MAP KEYS TERMINATED BY ':'

LINES TERMINATED BY '\n';

將數據文件加載到Hive中,并指定分區(qū):

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');

No rows affected (0.747 seconds)

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');

No rows affected (0.347 seconds)

0: jdbc:hive2://localhost:10000>

執(zhí)行如下sql,可以從不同的分區(qū)統計結果:

0: jdbc:hive2://localhost:10000> select count(*) from partition_table;

+------+

| _c0 |

+------+

| 16 |

+------+

1 row selected (15.881 seconds)

0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';

+------+

| _c0 |

+------+

| 8 |

+------+

1 row selected (14.639 seconds)

0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';

+------+

| _c0 |

+------+

| 8 |

+------+

1 row selected (15.555 seconds)

0: jdbc:hive2://localhost:10000>

分區(qū)表在hdfs中的存儲結構:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table

Found 2 items

drwxr-xr-x - root supergroup 0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011

drwxr-xr-x - root supergroup 0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011

Found 1 items

-rw-r--r-- 1 root supergroup 745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv

[root@hadoop01 ~]#

分桶表

建表語句:

CREATE TABLE bucket_table(

user_id string,

user_name string,

hobby array,

scores map

)

clustered by (user_name) sorted by (user_name) into 2 buckets

ROW FORMAT DELIMITED

FIELDS TERMINATED BY '|'

COLLECTION ITEMS TERMINATED BY ','

MAP KEYS TERMINATED BY ':'

LINES TERMINATED BY '\n';

將test表中的數據插入到bucket_table中:

0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;

No rows affected (17.393 seconds)

0: jdbc:hive2://localhost:10000>

抽樣查詢:

分桶表在hdfs的存儲目錄如下:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table

Found 2 items

-rw-r--r-- 1 root supergroup 465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0

-rw-r--r-- 1 root supergroup 281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0

[root@hadoop01 ~]#

Hive基本使用(下)內置函數/自定義函數/實現UDF

Hive常見內置函數:

字符串類型:concat、substr、 upper、 lower

時間類型:year、month、 day

復雜類型:size、 get_json_object

查詢引擎都自帶了一部分函數來幫助我們解決查詢過程當中一些復雜的數據計算或者數據轉換操作,但是有時候自帶的函數功能不能滿足業(yè)務的需要。這時候就需要我們自己開發(fā)自定義的函數來輔助完成了,這就是所謂的用戶自定義函數UDF(User-Defined Functions)。Hive支持三類自定義函數:

UDF:普通的用戶自定義函數。用來處理輸入一行,輸出一行的操作,類似Map操作。如轉換字符串大小寫,獲取字符串長度等

UDAF:用戶自定義聚合函數(User-defined aggregate function),用來處理輸入多行,輸出一行的操作,類似Reduce操作。比如MAX、COUNT函數。

UDTF:用戶自定義表產生函數(User defined table-generating function),用來處理輸入一行,輸出多行(即一個表)的操作, 不是特別常用

UDF函數其實就是一段遵循一定接口規(guī)范的程序。在執(zhí)行過程中Hive將SQL轉換為MapReduce程序,在執(zhí)行過程當中在執(zhí)行我們的UDF函數。

本小節(jié)簡單演示下自定義UDF函數,首先創(chuàng)建一個空的Maven項目,然后添加hive-exec依賴,版本與你安裝的Hive版本需對應上。完整的pom文件內容如下:

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

org.example

hive-udf-test

1.0-SNAPSHOT

org.apache.hive

hive-exec

3.1.2

org.apache.maven.plugins

maven-compiler-plugin

8

8

首先創(chuàng)建一個繼承UDF的類,我們實現的這個自定義函數功能就是簡單的獲取字段的長度:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

import org.apache.hadoop.io.Text;

public class StrLen extends UDF {

public int evaluate(final Text col) {

return col.getLength();

}

}

以上這種自定義函數只能支持處理普通類型的數據,如果要對復雜類型的數據做處理則需要繼承GenericUDF,并實現其抽象方法。例如,我們實現一個對測試數據中的scores字段求平均值的函數:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.text.DecimalFormat;

public class AvgScore extends GenericUDF {

/**

* 函數的名稱

*/

private static final String FUNC_NAME = "AVG_SCORE";

/**

* 函數所作用的字段類型,這里是map類型

*/

private transient MapObjectInspector mapOi;

/**

* 控制精度只返回兩位小數

*/

DecimalFormat df = new DecimalFormat("#.##");

@Override

public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {

// 在此方法中可以做一些前置的校驗,例如檢測函數參數個數、檢測函數參數類型

mapOi = (MapObjectInspector) objectInspectors[0];

// 指定函數的輸出類型

return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;

}

@Override

public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {

// 函數的核心邏輯,取出map中的value進行求平均值,并返回一個Double類型的結果值

Object o = deferredObjects[0].get();

double v = mapOi.getMap(o).values().stream()

.mapToDouble(a -> Double.parseDouble(a.toString()))

.average()

.orElse(0.0);

return Double.parseDouble(df.format(v));

}

@Override

public String getDisplayString(String[] strings) {

return "func(map)";

}

}

對項目進行打包,并上傳到服務器中:

[root@hadoop01 ~/jars]# ls

hive-udf-test-1.0-SNAPSHOT.jar

[root@hadoop01 ~/jars]#

將jar包上傳到hdfs中:

[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs

[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs

[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs

Found 1 items

-rw-r--r-- 1 root supergroup 4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar

[root@hadoop01 ~/jars]#

在Hive中添加該jar包:

0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;

No rows affected (0.022 seconds)

0: jdbc:hive2://localhost:10000>

然后注冊臨時函數,臨時函數只會在當前的session中生效:

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";

No rows affected (0.026 seconds)

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";

No rows affected (0.008 seconds)

0: jdbc:hive2://localhost:10000>

使用自定義函數處理:

0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;

+------------+---------+------------+

| user_name | length | avg_score |

+------------+---------+------------+

| Tom | 3 | 80.25 |

| Jerry | 5 | 77.5 |

| Jim | 3 | 83.75 |

| Angela | 6 | 84.5 |

| Ann | 3 | 90.0 |

| Bella | 5 | 69.25 |

| Bonnie | 6 | 76.5 |

| Caroline | 8 | 84.5 |

+------------+---------+------------+

8 rows selected (0.083 seconds)

0: jdbc:hive2://localhost:10000>

刪除已注冊的臨時函數:

0: jdbc:hive2://localhost:10000> drop temporary function strlen;

No rows affected (0.01 seconds)

0: jdbc:hive2://localhost:10000> drop temporary function avg_score;

No rows affected (0.009 seconds)

0: jdbc:hive2://localhost:10000>

臨時函數只會在當前的session中生效,如果需要注冊成永久函數則只需要把TEMPORARY關鍵字給去掉即可。如下所示:

0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';

No rows affected (0.049 seconds)

0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';

No rows affected (0.026 seconds)

0: jdbc:hive2://localhost:10000>

刪除永久函數也是把TEMPORARY關鍵字給去掉即可。如下所示:

0: jdbc:hive2://localhost:10000> drop function strlen;

No rows affected (0.031 seconds)

0: jdbc:hive2://localhost:10000> drop function avg_score;

No rows affected (0.026 seconds)

0: jdbc:hive2://localhost:10000>

Hive存儲結構 - OrcFile

Hive支持的存儲格式:

TextFile是默認的存儲格式,通過簡單的分隔符可以對csv等類型的文件進行解析。但實際應用中通常都是使用OrcFile格式,因為ORCFile是列式存儲格式,更加適合大數據查詢的場景。

我們都知道關系型數據庫基本是使用行式存儲作為存儲格式,而大數據領域更多的是采用列式存儲,因為大數據分析場景中通常需要讀取大量行,但是只需要少數的幾個列。這也是為什么通常使用OrcFile作為Hive的存儲格式的原因。由此可見,大數據的絕大部分應用場景都是OLAP場景。

OLAP場景的特點

讀多于寫

不同于事務處理(OLTP)的場景,比如電商場景中加購物車、下單、支付等需要在原地進行大量insert、update、delete操作,數據分析(OLAP)場景通常是將數據批量導入后,進行任意維度的靈活探索、BI工具洞察、報表制作等。

數據一次性寫入后,分析師需要嘗試從各個角度對數據做挖掘、分析,直到發(fā)現其中的商業(yè)價值、業(yè)務變化趨勢等信息。這是一個需要反復試錯、不斷調整、持續(xù)優(yōu)化的過程,其中數據的讀取次數遠多于寫入次數。這就要求底層數據庫為這個特點做專門設計,而不是盲目采用傳統數據庫的技術架構。

大寬表,讀大量行但是少量列,結果集較小

在OLAP場景中,通常存在一張或是幾張多列的大寬表,列數高達數百甚至數千列。對數據分析處理時,選擇其中的少數幾列作為維度列、其他少數幾列作為指標列,然后對全表或某一個較大范圍內的數據做聚合計算。這個過程會掃描大量的行數據,但是只用到了其中的少數列。而聚合計算的結果集相比于動輒數十億的原始數據,也明顯小得多。

數據批量寫入,且數據不更新或少更新

OLTP類業(yè)務對于延時(Latency)要求更高,要避免讓客戶等待造成業(yè)務損失;而OLAP類業(yè)務,由于數據量非常大,通常更加關注寫入吞吐(Throughput),要求海量數據能夠盡快導入完成。一旦導入完成,歷史數據往往作為存檔,不會再做更新、刪除操作。

無需事務,數據一致性要求低

OLAP類業(yè)務對于事務需求較少,通常是導入歷史日志數據,或搭配一款事務型數據庫并實時從事務型數據庫中進行數據同步。多數OLAP系統都支持最終一致性。

靈活多變,不適合預先建模

分析場景下,隨著業(yè)務變化要及時調整分析維度、挖掘方法,以盡快發(fā)現數據價值、更新業(yè)務指標。而數據倉庫中通常存儲著海量的歷史數據,調整代價十分高昂。預先建模技術雖然可以在特定場景中加速計算,但是無法滿足業(yè)務靈活多變的發(fā)展需求,維護成本過高。

行式存儲和列式存儲

行式存儲和列式存儲的對比圖:

與行式存儲將每一行的數據連續(xù)存儲不同,列式存儲將每一列的數據連續(xù)存儲。相比于行式存儲,列式存儲在分析場景下有著許多優(yōu)良的特性:

如前所述,分析場景中往往需要讀大量行但是少數幾個列。在行存模式下,數據按行連續(xù)存儲,所有列的數據都存儲在一個block中,不參與計算的列在IO時也要全部讀出,讀取操作被嚴重放大。而列存模式下,只需要讀取參與計算的列即可,極大的減低了IO cost,加速了查詢。

同一列中的數據屬于同一類型,壓縮效果顯著。列存往往有著高達十倍甚至更高的壓縮比,節(jié)省了大量的存儲空間,降低了存儲成本。

更高的壓縮比意味著更小的data size,從磁盤中讀取相應數據耗時更短。

自由的壓縮算法選擇。不同列的數據具有不同的數據類型,適用的壓縮算法也就不盡相同。可以針對不同列類型,選擇最合適的壓縮算法。

高壓縮比,意味著同等大小的內存能夠存放更多數據,系統cache效果更好。

OrcFile

OrcFile存儲格式:

Orc列式存儲優(yōu)點:

查詢時只需要讀取查詢所涉及的列,降低IO消耗,同時保存每一列統計信息,實現部分謂詞下推

每列數據類型一致,可針對不同的數據類型采用其高效的壓縮算法

列式存儲格式假設數據不會發(fā)生改變,支持分片、流式讀取,更好的適應分布式文件存儲的特性

除了Orc外,Parquet也是常用的列式存儲格式。Orc VS Parquet:

OrcFile和Parquet都是Apache的頂級項目

Parquet不支持ACID、不支持更新,Orc支持有限的ACID和更新

Parquet的壓縮能力較高,Orc的查詢效率較高

離線數倉VS實時數倉

離線數倉:

離線數據倉庫主要基于Hive等技術來構建T+1的離線數據

通過定時任務每天拉取增量數據導入到Hive表中

創(chuàng)建各個業(yè)務相關的主題維度數據,對外提供T+1的數據查詢接口

離線數倉架構:

數據源通過離線的方式導入到離線數倉中

數據分層架構:ODS、DWD、 DM

下游應用根據業(yè)務需求選擇直接讀取DM

實時數倉:

實時數倉基于數據采集工具,將原始數據寫入到Kafka等數據通道

數據最終寫入到類似于HBase這樣支持快速讀寫的存儲系統

對外提供分鐘級別、甚至秒級別的查詢方案

實時數倉架構:

業(yè)務實時性要求的不斷提高,實時處理從次要部分變成了主要部分

Lambda架構:在離線大數據架構基礎上加了一個加速層,使用流處理技術完成實時性較高的指標計算

Kappa架構:以實時事件處理為核心,統一數據處理

圖解Lambda架構數據流程

Lambda 架構(Lambda Architecture)是由 Twitter 工程師南森·馬茨(Nathan Marz)提出的大數據處理架構。這一架構的提出基于馬茨在 BackType 和 Twitter 上的分布式數據處理系統的經驗。

Lambda 架構使開發(fā)人員能夠構建大規(guī)模分布式數據處理系統。它具有很好的靈活性和可擴展性,也對硬件故障和人為失誤有很好的容錯性。

Lambda 架構總共由三層系統組成:批處理層(Batch Layer),速度處理層(Speed Layer),以及用于響應查詢的服務層(Serving Layer)。

在 Lambda 架構中,每層都有自己所肩負的任務。批處理層存儲管理主數據集(不可變的數據集)和預先批處理計算好的視圖。批處理層使用可處理大量數據的分布式處理系統預先計算結果。它通過處理所有的已有歷史數據來實現數據的準確性。這意味著它是基于完整的數據集來重新計算的,能夠修復任何錯誤,然后更新現有的數據視圖。輸出通常存儲在只讀數據庫中,更新則完全取代現有的預先計算好的視圖。

速度處理層會實時處理新來的數據。速度層通過提供最新數據的實時視圖來最小化延遲。速度層所生成的數據視圖可能不如批處理層最終生成的視圖那樣準確或完整,但它們幾乎在收到數據后立即可用。而當同樣的數據在批處理層處理完成后,在速度層的數據就可以被替代掉了。

本質上,速度層彌補了批處理層所導致的數據視圖滯后。比如說,批處理層的每個任務都需要 1 個小時才能完成,而在這 1 個小時里,我們是無法獲取批處理層中最新任務給出的數據視圖的。而速度層因為能夠實時處理數據給出結果,就彌補了這 1 個小時的滯后。

所有在批處理層和速度層處理完的結果都輸出存儲在服務層中,服務層通過返回預先計算的數據視圖或從速度層處理構建好數據視圖來響應查詢。

所有的新用戶行為數據都可以同時流入批處理層和速度層。批處理層會永久保存數據并且對數據進行預處理,得到我們想要的用戶行為模型并寫入服務層。而速度層也同時對新用戶行為數據進行處理,得到實時的用戶行為模型。

而當“應該對用戶投放什么樣的廣告”作為一個查詢(Query)來到時,我們從服務層既查詢服務層中保存好的批處理輸出模型,也對速度層中處理的實時行為進行查詢,這樣我們就可以得到一個完整的用戶行為歷史了。

一個查詢就如下圖所示,既通過批處理層兼顧了數據的完整性,也可以通過速度層彌補批處理層的高延時性,讓整個查詢具有實時性。

Kappa 架構 VS Lambda

Lambda 架構的不足

雖然 Lambda 架構使用起來十分靈活,并且可以適用于很多的應用場景,但在實際應用的時候,Lambda 架構也存在著一些不足,主要表現在它的維護很復雜。

使用 Lambda 架構時,架構師需要維護兩個復雜的分布式系統,并且保證他們邏輯上產生相同的結果輸出到服務層中。舉個例子吧,我們在部署 Lambda 架構的時候,可以部署 Apache Hadoop 到批處理層上,同時部署 Apache Flink 到速度層上。

我們都知道,在分布式框架中進行編程其實是十分復雜的,尤其是我們還會針對不同的框架進行專門的優(yōu)化。所以幾乎每一個架構師都認同,Lambda 架構在實戰(zhàn)中維護起來具有一定的復雜性。

那要怎么解決這個問題呢?我們先來思考一下,造成這個架構維護起來如此復雜的根本原因是什么呢?

維護 Lambda 架構的復雜性在于我們要同時維護兩套系統架構:批處理層和速度層。我們已經說過了,在架構中加入批處理層是因為從批處理層得到的結果具有高準確性,而加入速度層是因為它在處理大規(guī)模數據時具有低延時性。

那我們能不能改進其中某一層的架構,讓它具有另外一層架構的特性呢?例如,改進批處理層的系統讓它具有更低的延時性,又或者是改進速度層的系統,讓它產生的數據視圖更具準確性和更加接近歷史數據呢?

另外一種在大規(guī)模數據處理中常用的架構——Kappa 架構(Kappa Architecture),便是在這樣的思考下誕生的。

Kappa 架構

Kappa 架構是由 LinkedIn 的前首席工程師杰伊·克雷普斯(Jay Kreps)提出的一種架構思想。克雷普斯是幾個著名開源項目(包括 Apache Kafka 和 Apache Samza 這樣的流處理系統)的作者之一,也是現在 Confluent 大數據公司的 CEO。

克雷普斯提出了一個改進 Lambda 架構的觀點:

我們能不能改進 Lambda 架構中速度層的系統性能,使得它也可以處理好數據的完整性和準確性問題呢?我們能不能改進 Lambda 架構中的速度層,使它既能夠進行實時數據處理,同時也有能力在業(yè)務邏輯更新的情況下重新處理以前處理過的歷史數據呢?

他根據自身多年的架構經驗發(fā)現,我們是可以做到這樣的改進的。我們知道像 Apache Kafka 這樣的流處理平臺是具有永久保存數據日志的功能的。通過Kafka的這一特性,我們可以重新處理部署于速度層架構中的歷史數據。

下面我就以 Kafka 為例來介紹整個全新架構的過程。

第一步,部署 Kafka,并設置數據日志的保留期(Retention Period)。

這里的保留期指的是你希望能夠重新處理的歷史數據的時間區(qū)間。例如,如果你希望重新處理最多一年的歷史數據,那就可以把 Apache Kafka 中的保留期設置為 365 天。如果你希望能夠處理所有的歷史數據,那就可以把 Apache Kafka 中的保留期設置為“永久(Forever)”。

第二步,如果我們需要改進現有的邏輯算法,那就表示我們需要對歷史數據進行重新處理。我們需要做的就是重新啟動一個 Kafka 作業(yè)實例(Instance)。這個作業(yè)實例將重頭開始,重新計算保留好的歷史數據,并將結果輸出到一個新的數據視圖中。

我們知道 Kafka 的底層是使用 Log Offset 來判斷現在已經處理到哪個數據塊了,所以只需要將 Log Offset 設置為 0,新的作業(yè)實例就會重頭開始處理歷史數據。

第三步,當這個新的數據視圖處理過的數據進度趕上了舊的數據視圖時,我們的應用便可以切換到從新的數據視圖中讀取。

第四步,停止舊版本的作業(yè)實例,并刪除舊的數據視圖。

這個架構就如同下圖所示。

與 Lambda 架構不同的是,Kappa 架構去掉了批處理層這一體系結構,而只保留了速度層。你只需要在業(yè)務邏輯改變又或者是代碼更改的時候進行數據的重新處理。Kappa 架構統一了數據的處理方式,不再維護離線和實時兩套代碼邏輯。

Kappa 架構的不足

Kappa 架構也是有著它自身的不足的。因為 Kappa 架構只保留了速度層而缺少批處理層,在速度層上處理大規(guī)模數據可能會有數據更新出錯的情況發(fā)生,這就需要我們花費更多的時間在處理這些錯誤異常上面。如果需求發(fā)生變化或歷史數據需要重新處理都得通過上游重放來完成。并且重新處理歷史的吞吐能力會低于批處理。

還有一點,Kappa 架構的批處理和流處理都放在了速度層上,這導致了這種架構是使用同一套代碼來處理算法邏輯的。所以 Kappa 架構并不適用于批處理和流處理代碼邏輯不一致的場景。

Lambda VS Kappa

主流大公司的實時數倉架構

阿里菜鳥實時數倉

美團實時數倉

實時數倉建設特征

整體架構設計通過分層設計為OLAP查詢分擔壓力

復雜的計算統一在實時計算層做,避免給OLAP查詢帶來過大的壓力

匯總計算通過OLAP數據查詢引擎進行

整個架構中實時計算一般 是Spark+Flink配合

消息隊列Kafka一家獨大,配合HBase、ES、 Mysq|進行數據落盤

OLAP領域Presto、Druid、 Clickhouse、 Greenplum等等層出不窮

總結

以上是生活随笔為你收集整理的数据仓库—stg层_数据仓库之Hive快速入门 - 离线实时数仓架构的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。