kylin KV+cube方案分析
2019獨角獸企業重金招聘Python工程師標準>>>
前言
??在使用Kylin的時候,最重要的一步就是創建cube的模型定義,即指定度量和維度以及一些附加信息,然后對cube進行build,當然我們也可以根據原始表中的某一個string字段(這個字段的格式必須是日期格式,表示日期的含義)設定分區字段,這樣一個cube就可以進行多次build,每一次的build會生成一個segment,每一個segment對應著一個時間區間的cube,這些segment的時間區間是連續并且不重合的,對于擁有多個segment的cube可以執行merge,相當于將一個時間區間內部的segment合并成一個。下面從源碼開始分析cube的build和merge過程。本文基于Kylin-1.0-incubating版本,對于Kylin的介紹可以參見:http://blog.csdn.net/yu616568/article/details/48103415
入口介紹
??在kylin的web頁面上創建完成一個cube之后可以點擊action下拉框執行build或者merge操作,這兩個操作都會調用cube的rebuild接口,調用的參數包括:1、cube名,用于唯一標識一個cube,在當前的kylin版本中cube名是全局唯一的,而不是每一個project下唯一的;2、本次構建的startTime和endTime,這兩個時間區間標識本次構建的segment的數據源只選擇這個時間范圍內的數據;對于BUILD操作而言,startTime是不需要的,因為它總是會選擇最后一個segment的結束時間作為當前segment的起始時間。3、buildType標識著操作的類型,可以是”BUILD”、”MERGE”和”REFRESH”。?
??這些操作的統一入口就是JobService.submitJob函數,該函數首先取出該cube所有關聯的構建cube的job,并且判斷這些job是否有處于READY、RUNNING、ERROR狀態,如果處于該狀態意味著這個job正在執行或者可以之后被resume執行,做這種限制的原因不得而知(可能是構建的區間是基于時間吧,需要對一個cube并行的構建多個segment(時間區間的數據)的需求并不明顯)。所以如果希望build或者merge cube,必須將未完成的cube的操作執行discard操作。然后根據操作類型執行具體的操作:?
1. 如果是BUILD,如果這個cube中包含distinct count聚合方式的度量并且這個cube中已經存在其他segment,則執行appendAndMergeSegments函數,否則執行buildJob函數。?
2. 如果是MERGE操作則執行mergeSegments函數。?
3. 如果是REFRESH,則同樣執行buildJob函數。為這個時間區間的segment重新構建。?
??buildJob函數構建一個新的segment,mergeSegments函數合并一個時間區間內的所有segments,appendAndMergeSegments函數則首先根據最后一個segment的時間區間的end值build一個新的segment然后再將所有的時間區間的segments進行合并(為什么包含distinct count的聚合函數的cube的構建一定要進行合并呢?這應該是有distinct-count使用的hyperloglog算法決定的,下次可以專門分析一下這個算法)。
BUILD操作
??Build操作是構建一個cube指定時間區間的數據,由于kylin基于預計算的方式提供數據查詢,構建操作是指將原始數據(存儲在Hadoop中,通過Hive獲取)轉換成目標數據(存儲在Hbase中)的過程。主要的步驟可以按照順序分為四個階段:1、根據用戶的cube信息計算出多個cuboid文件,2、根據cuboid文件生成htable,3、更新cube信息,4、回收臨時文件。每一個階段操作的輸入都需要依賴于上一步的輸出,所以這些操作全是順序執行的。
1. 計算cuboid文件
??在kylin的CUBE模型中,每一個cube是由多個cuboid組成的,理論上有N個普通維度的cube可以是由2的N次方個cuboid組成的,那么我們可以計算出最底層的cuboid,也就是包含全部維度的cuboid(相當于執行一個group by全部維度列的查詢),然后在根據最底層的cuboid一層一層的向上計算,直到計算出最頂層的cuboid(相當于執行了一個不帶group by的查詢),其實這個階段kylin的執行原理就是這個樣子的,不過它需要將這些抽象成mapreduce模型,提交mapreduce作業執行。
1.1 生成原始數據(Create Intermediate Flat Hive Table)
??這一步的操作是根據cube的定義生成原始數據,這里會新創建一個hive外部表,然后再根據cube中定義的星狀模型,查詢出維度(對于DERIVED類型的維度使用的是外鍵列)和度量的值插入到新創建的表中,這個表是一個外部表,表的數據文件(存儲在HDFS)作為下一個子任務的輸入,它首先根據維度中的列和度量中作為參數的列得到需要出現在該表中的列,然后執行三步hive操作,這三步hive操作是通過hive -e的方式執行的shell命令。?
??1. drop TABLE IF EXISTS xxx.?
??2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\177’ STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根據當前的cube名和segment的uuid生成的,location是當前job的臨時文件,只有當insert插入數據的時候才會創建,注意這里每一行的分隔符指定的是’\177’(目前是寫死的,十進制為127).?
??3. 插入數據,在執行之前需要首先設置一些配置項,這些配置項通過hive的SET命令設置,是根據這個cube的job的配置文件(一般是在kylin的conf目錄下)設置的,最后執行的是INSERT OVERWRITE TABLE xxx SELECT xxxx語句,SELECT子句中選出cube星狀模型中事實表與維度表按照設置的方式join之后的出現在維度或者度量參數中的列(特殊處理derived列),然后再加上用戶設置的where條件和partition的時間條件(根據輸入build的參數).?
??需要注意的是這里無論用戶設置了多少維度和度量,每次join都會使用事實表和所有的維度表進行join,這可能造成不必要的性能損失(多一個join會影響hive性能,畢竟要多讀一些文件)。這一步執行完成之后location指定的目錄下就有了原始數據的文件,為接下來的任務提供了輸入。
1.2 創建事實表distinct column文件(Extract Fact Table Distinct Columns)
??在這一步是根據上一步生成的hive表計算出還表中的每一個出現在事實表中的度量的distinct值,并寫入到文件中,它是啟動一個MR任務完成的,MR任務的輸入是HCatInputFormat,它關聯的表就是上一步創建的臨時表,這個MR任務的map階段首先在setup函數中得到所有度量中出現在事實表的度量在臨時表的index,根據每一個index得到該列在臨時表中在每一行的值value,然后將<index, value>作為mapper的輸出,該任務還啟動了一個combiner,它所做的只是對同一個key的值進行去重(同一個mapper的結果),reducer所做的事情也是進行去重(所有mapper的結果),然后將每一個index對應的值一行行的寫入到以列名命名的文件中。如果某一個維度列的distinct值比較大,那么可能導致MR任務執行過程中的OOM。?
??對于這一步我有一個疑問就是既然所有的原始數據都已經通過第一步存入到臨時hive表中了,我覺得接下來就不用再區分維度表和事實表了,所有的任務都基于這個臨時表,那么這一步就可以根據臨時表計算出所有的維度列的distinct column值,但是這里僅僅針對出現在事實表上的維度,不知道這樣做的原因是什么?難道是因為在下一步會單獨計算維度表的dictionary以及snapshot?
1.3 創建維度詞典(Build Dimension Dictionary)
??這一步是根據上一步生成的distinct column文件和維度表計算出所有維度的詞典信息,詞典是為了節約存儲而設計的,用于將一個成員值編碼成一個整數類型并且可以通過整數值獲取到原始成員值,每一個cuboid的成員是一個key-value形式存儲在hbase中,key是維度成員的組合,但是一般情況下維度是一些字符串之類的值(例如商品名),所以可以通過將每一個維度值轉換成唯一整數而減少內存占用,在從hbase查找出對應的key之后再根據詞典獲取真正的成員值。?
?? 這一步是在kylin進程內的一個線程中執行的,它會創建所有維度的dictionary,如果是事實表上的維度則可以從上一步生成的文件中讀取該列的distinct成員值(FileTable),否則則需要從原始的hive表中讀取每一列的信息(HiveTable),根據不同的源(文件或者hive表)獲取所有的列去重之后的成員列表,然后根據這個列表生成dictionary,kylin中針對不同類型的列使用不同的實現方式,對于time之類的(date、time、dtaetime和timestamp)使用DateStrDictionary,這里目前還存在著一定的問題,因為這種編碼方式會首先將時間轉換成‘yyyy-MM-dd’的格式,會導致timestamp之類的精確時間失去天以后的精度。針對數值型的使用NumberDictionary,其余的都使用一般的TrieDictionary(字典樹)。這些dictionary會作為cube的元數據存儲的kylin元數據庫里面,執行query的時候進行轉換。?
?? 之后還需要計算維度表的snapshotTable,每一個snapshot是和一個hive維度表對應的,生成的過程是:首先從原始的hive維度表中順序得讀取每一行每一列的值,然后使用TrieDictionary方式對這些所有的值進行編碼,這樣每一行每一列的之都能夠得到一個編碼之后的id(相同的值id也相同),然后再次讀取原始表中每一行的值,將每一列的值使用編碼之后的id進行替換,得到了一個只有id的新表,這樣同時保存這個新表和dictionary對象(id和值得映射關系)就能夠保存整個維度表了,同樣,kylin也會將這個數據存儲元數據庫中。?
?? 針對這一步需要注意的問題:首先,這一步的兩個步驟都是在kylin進程的一個線程中執行的,第一步會加載某一個維度的所有distinct成員到內存,如果某一個維度的cardinality比較大 ,可能會導致內存出現OOM,然后在創建snapshotTable的時候會限制原始表的大小不能超過配置的一個上限值,如果超過則會執行失敗。但是應該強調的是這里加載全部的原始維度表更可能出現OOM。另外,比較疑惑的是:1、為什么不在上一步的MR任務中直接根據臨時表中的數據生成每一個distinct column值,而是從原始維度表中讀取?2、計算全表的dictionary是為了做什么?我目前只了解對于drived維度是必要保存主鍵和列之間的映射,但是需要保存整個維度表?!
1.4 計算生成BaseCuboid文件(Build Base Cuboid Data)
?? 何謂Base cuboid呢?假設一個cube包含了四個維度:A/B/C/D,那么這四個維度成員間的所有可能的組合就是base cuboid,這就類似在查詢的時候指定了select count(1) from xxx group by A,B,C,D;這個查詢結果的個數就是base cuboid集合的成員數。這一步也是通過一個MR任務完成的,輸入是臨時表的路徑和分隔符,map對于每一行首先進行split,然后獲取每一個維度列的值組合作為rowKey,但是rowKey并不是簡單的這些維度成員的內容組合,而是首先將這些內容從dictionary中查找出對應的id,然后組合這些id得到rowKey,這樣可以大大縮短hbase的存儲空間,提升查找性能。然后在查找該行中的度量列,根據cube定義中度量的函數返回對該列計算之后的值。這個MR任務還會執行combiner過程,執行邏輯和reducer相同,在reducer中的key是一個rowKey,value是相同的rowKey的measure組合的數組,reducer回分解出每一個measure的值,然后再根據定義該度量使用的聚合函數計算得到這個rowKey的結果,其實這已經類似于hbase存儲的格式了。
1.5 計算第N層cuboid文件(Build N-Dimension Cuboid Data)
??這一個流程是由多個步驟的,它是根據維度組合的cuboid的總數決定的,上一層cuboid執行MR任務的輸入是下一層cuboid計算的輸出,由于最底層的cuboid(base)已經計算完成,所以這幾步不需要依賴于任何的hive信息,它的reducer和base cuboid的reducer過程基本一樣的(相同rowkey的measure執行聚合運算),mapper的過程只需要根據這一行輸入的key(例如A、B、C、D中某四個成員的組合)獲取可能的下一層的的組合(例如只有A、B、C和B、C、D),那么只需要將這些可能的組合提取出來作為新的key,value不變進行輸出就可以了。?
舉個例子,假設一共四個維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個measure(對于這列V,計算sum(V)),這里忽略dictionary編碼。原始表如下:
| A1 | B1 | C1 | D1 | 2 |
| A1 | B2 | C1 | D1 | 3 |
| A2 | B1 | C1 | D1 | 5 |
| A3 | B1 | C1 | D1 | 6 |
| A3 | B2 | C1 | D1 | 8 |
那么base cuboid最終的輸出如下?
(<A1、B1、C1、D1>、2)?
(<A1、B2、C1、D1>, 3)?
(<A2、B1、C1、D1>, 5)?
(<A3、B1、C1、D1>, 6)?
(<A3、B2、C1、D1>, 8)?
那么它作為下面一個cuboid的輸入,對于第一行輸入?
(<A1、B1、C1、D1>, 2),mapper執行完成之后會輸出?
(<A1、B1、C1>, 2)、?
(<A1、B1、D1>, 2)、?
(<A1、C1、D1>, 2)、?
(<B1、C1、D1>, 2)這四項,同樣對于其他的內一行也會輸出四行,最終他們經過reducer的聚合運算,得到如下的結果:?
(<A1、B1、C1>, 2)?
(<A1、B1、D1>, 2)?
(<A1、C1、D1>, 2 + 3)?
(<B1、C1、D1>,2 + 5 +6)?
...?
這樣一次將下一層的結果作為輸入計算上一層的cuboid成員,直到最頂層的cuboid,這一個層cuboid只包含一個成員,不按照任何維度進行group by。?
??上面的這些步驟用于生成cuboid,假設有N個維度(對于特殊類型的),那么就需要有N +1層cuboid,每一層cuboid可能是由多個維度的組合,但是它包含的維度個數相同。
2 準備輸出
??在上面幾步中,我們已經將每一層的cuboid計算完成,每一層的cuboid文件都是一些cuboid的集合,每一層的cuboid的key包含相同的維度個數,下面一步就是將這些cuboid文件導入到hbase中。
2.1 計算分組
??這一步的輸入是之前計算的全部的cuboid文件,按照cuboid文件的順序(層次的順序)一次讀取每一個key-value,再按照key-value的形式統計每一個key和value占用的空間大小,然后以GB為單位,mapper階段的輸出是每當統計到1GB的數據,將當前的這個key和當前數據量總和輸出,在reducer階段根據用戶創建cube時指定的cube大小(SMALL,MEDIUM和LARGE)和總的大小計算出實際需要劃分為多少分區,這時還需要參考最多分區數和最少分區數進行計算,再根據實際數據量大小和分區數計算出每一個分區的邊界key,將這個key和對應的分區編號輸出到最終文件中,為下一步創建htable做準備。
2.2 創建HTable
??這一步非常簡單,根據上一步計算出的rowKey分布情況(split數組)創建HTable,創建一個HTable的時候還需要考慮一下幾個事情:1、列組的設置,2、每一個列組的壓縮方式,3、部署coprocessor,4、HTable中每一個region的大小。在這一步中,列組的設置是根據用戶創建cube時候設置的,在hbase中存儲的數據key是維度成員的組合,value是對應聚合函數的結果,列組針對的是value的,一般情況下在創建cube的時候只會設置一個列組,該列包含所有的聚合函數的結果;在創建HTable時默認使用LZO壓縮,如果不支持LZO則不進行壓縮,在后面kylin的版本中支持更多的壓縮方式;kylin強依賴于hbase的coprocessor,所以需要在創建HTable為該表部署coprocessor,這個文件會首先上傳到HBase所在的HDFS上,然后在表的元信息中關聯,這一步很容易出現錯誤,例如coprocessor找不到了就會導致整個regionServer無法啟動,所以需要特別小心;region的劃分已經在上一步確定了,所以這里不存在動態擴展的情況,所以kylin創建HTable使用的接口如下:?
public void createTable( final HTableDescriptor desc , byte [][] splitKeys)
2.3 構建hfile文件
??創建完了HTable之后一般會通過插入接口將數據插入到表中,但是由于cuboid中的數據量巨大,頻繁的插入會對Hbase的性能有非常大的影響,所以kylin采取了首先將cuboid文件轉換成HTable格式的Hfile文件,然后在通過bulkLoad的方式將文件和HTable進行關聯,這樣可以大大降低Hbase的負載,這個過程通過一個MR任務完成。?
??這個任務的輸入是所有的cuboid文件,在mapper階段根據每一個cuboid成員的key-value輸出,如果cube定義時指定了多個列組,那么同一個key要按照不同列組中的值分別輸出,例如在cuboid文件中存在一行cuboid=1,key=1,value=sum(cost),count(1)的數據,而cube中將這兩個度量劃分到兩個列組中,這時候對于這一行數據,mapper的輸出為<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它會按照行排序輸出,如果一行中包含多個值,那么會將這些值進行排序再輸出。輸出的格式則是根據HTable的文件格式定義的。
2.4 BulkLoad文件
??這一步將HFile文件load到HTable中,因為load操作會將原始的文件刪除(相當于remove),在操作之前首先將所有列組的Hfile的權限都設置為777,然后再啟動LoadIncrementalHFiles任務執行load操作,它的輸入為文件的路徑和HTable名,這一步完全依賴于HBase的工具。這一步完成之后,數據已經存儲到HBase中了,key的格式由cuboid編號+每一個成員在字典樹的id組成,value可能保存在多個列組里,包含在原始數據中按照這幾個成員進行GROUP BY計算出的度量的值。
3 收尾工作
??執行完上一步就已經完成了從輸入到輸出的計算過程,接下來要做的就是一些kylin內部的工作,分別是更新元數據,更新cube狀態,垃圾數據回收。
3.1 更新狀態
??這一步主要是更新cube的狀態,其中需要更新的包括cube是否可用、以及本次構建的數據統計,包括構建完成的時間,輸入的record數目,輸入數據的大小,保存到Hbase中數據的大小等,并將這些信息持久到元數據庫中。
3.2 垃圾文件回收
??這一步是否成功對正確性不會有任何影響,因為經過上一步之后這個segment就可以在這個cube中被查找到了,但是在整個執行過程中產生了很多的垃圾文件,其中包括:1、臨時的hive表,2、因為hive表是一個外部表,存儲該表的文件也需要額外刪除,3、fact distinct 這一步將數據寫入到HDFS上為建立詞典做準備,這時候也可以刪除了,4、rowKey統計的時候會生成一個文件,此時可以刪除。5、生成HFile時文件存儲的路徑和hbase真正存儲的路徑不同,雖然load是一個remove操作,但是上層的目錄還是存在的,也需要刪除。這一步kylin做的比較簡單,并沒有完全刪除所有的臨時文件,其實在整個計算過程中,真正還需要保留的數據只有多個cuboid文件(需要增量build的cube),這個因為在不同segment進行merge的時候是基于cuboid文件的,而不是根據HTable的。
??在Kylin-1.x版本中,整個cube的一個build的過程大概就是這樣,這樣的一個build只不過是生成一虐segment,而當一個cube中存在多個segment時可能需要將它們進行merge,merge的過程和build的流程大致是相同的,不過它不需要從頭開始,只需要對字典進行merge,然后在對cuboid文件進行merge,最后生成一個新的HTable。?
但是在Kylin-2.x版本中,整個家溝發生了很大的變化,build的引擎也分成了多套,分別是原始的MR引擎,基于Fast Cubing的MR引擎和Spark引擎,這使得build進行的更迅速,大大降低等待時間,后面會持續的再對新的引擎進行分析。
Kylin Cube Build的接口說明
每一個Cube需要設置數據源、計算引擎和存儲引擎,工廠類負責創建數據源對象、計算引擎對象和存儲引擎對象
三者之間通過適配器進行串聯
數據源接口(ISource)
public interface ISource extends Closeable {
? ? // 同步數據源中表的元數據信息
? ? ISourceMetadataExplorer getSourceMetadataExplorer();
? ? // 適配制定的構建引擎接口
? ? <I> I adaptToBuildEngine(Class<I> engineInterface);
? ? // 順序讀取表
? ? IReadableTable createReadableTable(TableDesc tableDesc);
? ? // 構建之前豐富數據源的Partition
? ? SourcePartition enrichSourcePartitionBeforeBuild(IBuildable buildable, SourcePartition srcPartition);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
存儲引擎接口(IStorage)
public interface IStorage {
? ? // 創建一個查詢指定Cube的對象
? ? public IStorageQuery createQuery(IRealization realization);
? ? public <I> I adaptToBuildEngine(Class<I> engineInterface);
}
1
2
3
4
5
6
7
8
計算引擎接口(IBatchCubingEngine)
public interface IBatchCubingEngine {
? ? public IJoinedFlatTableDesc getJoinedFlatTableDesc(CubeSegment newSegment);
? ? // 返回一個工作流計劃, 用以構建指定的CubeSegment
? ? public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter);
? ?// 返回一個工作流計劃, 用以合并指定的CubeSegment
? ? public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
? ?// 返回一個工作流計劃, 用以優化指定的CubeSegment
? ? public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);
? ? public Class<?> getSourceInterface();
? ? public Class<?> getStorageInterface();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
離線Cube Build 調用鏈
Rest API請求/{cubeName}/rebuild, 調用CubeController.rebuild()方法 -> jobService.submitJob()
Project級別的權限校驗: aclEvaluate.checkProjectOperationPermission(cube);
ISource source = SourceManager.getSource(cube)根據CubeInstance的方法getSourceType()的返回值決定ISource的對象類型
public int getSourceType() {
? ? return getModel().getRootFactTable().getTableDesc().getSourceType();
}
1
2
3
分配新的segment: newSeg = getCubeManager().appendSegment(cube, src);
EngineFactory根據Cube定義的engine type, 創建對應的IBatchCubingEngine對象 -> 調用createBatchCubingJob()方法創建作業鏈,MRBatchCubingEngine2新建的是BatchCubingJobBuilder2
public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
? ? super(newSegment, submitter);
? ? this.inputSide = MRUtil.getBatchCubingInputSide(seg);
? ? this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
} ? ?
1
2
3
4
5
適配輸入數據源到構建引擎
SourceManager.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(flatDesc);
public static <T> T createEngineAdapter(ISourceAware table, Class<T> engineInterface) {
? ? return getSource(table).adaptToBuildEngine(engineInterface);
}
// HiveSource返回的是HiveMRInput
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
? ? if (engineInterface == IMRInput.class) {
? ? ? ? return (I) new HiveMRInput();
? ? } else {
? ? ? ? throw new RuntimeException("Cannot adapt to " + engineInterface);
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
適配存儲引擎到構建引擎
StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
public static <T> T createEngineAdapter(IStorageAware aware, Class<T> engineInterface) {
? ? return storage(aware).adaptToBuildEngine(engineInterface);
}
// HBaseStorage返回的是HBaseMROutput2Transition
public <I> I adaptToBuildEngine(Class<I> engineInterface) {
? ? if (engineInterface == IMROutput2.class) {
? ? ? ? return (I) new HBaseMROutput2Transition();
? ? } else {
? ? ? ? throw new RuntimeException("Cannot adapt to " + engineInterface);
? ? }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
適配成功后, new BatchCubingJobBuilder2(newSegment, submitter).build()該方法創建具體的執行步驟, 形成工作流
將工作流添加到執行管理器,等待調度執行: getExecutableManager().addJob(job);
---------------------?
本文主要介紹了Apache Kylin是如何將Hive表中的數據轉化為HBase的KV結構,并簡單介紹了Kylin的SQL查詢是如何轉化為HBase的Scan操作。
Apache Kylin 是什么
Apache Kylin是一個開源的、基于Hadoop生態系統的OLAP引擎(OLAP查詢引擎、OLAP多維分析引擎),能夠通過SQL接口對十億、甚至百億行的超大數據集實現秒級的多維分析查詢。
Apache?Kylin 核心:Kylin OLAP引擎基礎框架,包括元數據引擎,查詢引擎,Job(Build)引擎及存儲引擎等,同時包括REST服務器以響應客戶端請求。
OLAP 是什么
即聯機分析處理:以復雜的分析型查詢為主,需要掃描,聚合大量數據。
Kylin如何實現超大數據集的秒級多維分析查詢
預計算
對于超大數據集的復雜查詢,既然現場計算需要花費較長時間,那么根據空間換時間的原理,我們就可以提前將所有可能的計算結果計算并存儲下來,從而實現超大數據集的秒級多維分析查詢。
Kylin的預計算是如何實現的
將數據源Hive表中的數據按照指定的維度和指標 由計算引擎MapReduce離線計算出所有可能的查詢結果(即Cube)存儲到HBase中。
Cube 和 Cuboid是什么
簡單地說,一個cube就是一個Hive表的數據按照指定維度與指標計算出的所有組合結果。
其中每一種維度組合稱為cuboid,一個cuboid包含一種具體維度組合下所有指標的值。
如下圖,整個立方體稱為1個cube,立方體中每個網格點稱為1個cuboid,圖中(A,B,C,D)和(A,D)都是cuboid,特別的,(A,B,C,D)稱為Base cuboid。cube的計算過程是逐層計算的,首先計算Base cuboid,然后計算維度數依次減少,逐層向下計算每層的cuboid。
圖1
Build引擎Cube構建流程
BatchCubingJobBuilder2.build方法邏輯如下:
??public CubingJob build() {
????????logger.info("MR_V2 new job to BUILD segment " + seg);???????
????????final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
????????final String jobId = result.getId();
????????final String cuboidRootPath = getCuboidRootPath(jobId);
??????
????????// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
????????// 根據事實表和維表抽取需要的維度和度量,創建一張寬表或平表,并且進行文件再分配(執行Hive命令行來完成操作)
????????inputSide.addStepPhase1_CreateFlatTable(result);???????
?
????????// Phase 2: Build Dictionary
????????// 創建字典由三個子任務完成,由MR引擎完成,分別是抽取維度值(包含抽樣統計)、創建維度字典和保存統計信息
????????result.addTask(createFactDistinctColumnsStep(jobId));
????????result.addTask(createBuildDictionaryStep(jobId));
????????result.addTask(createSaveStatisticsStep(jobId));
????????// add materialize lookup tables if needed
????????LookupMaterializeContext lookupMaterializeContext = addMaterializeLookupTableSteps(result);
?
????????// 創建HTable
????????outputSide.addStepPhase2_BuildDictionary(result);
??????
????????// Phase 3: Build Cube
????????// 構建Cube,包含兩種Cube構建算法,分別是逐層算法和快速算法,在執行時會根據源數據的統計信息自動選擇一種算法(各個Mapper的小Cube的行數之和 / reduce后的Cube行數 > 7,重復度高就選逐層算法,重復度低就選快速算法)
????????addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
????????addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
????????// 構建HFile文件及把HFile文件BulkLoad到HBase
????????outputSide.addStepPhase3_BuildCube(result);
???????
????????// Phase 4: Update Metadata & Cleanup
????????// 更新Cube元數據,其中需要更新的包括cube是否可用、以及本次構建的數據統計,包括構建完成的時間,輸入的record數目,輸入數據的大小,保存到Hbase中數據的大小等,并將這些信息持久到元數據庫中
?
????????// 以及清理臨時數據,是在整個執行過程中產生了很多的垃圾文件,其中包括:1、臨時的hive表,2、因為hive表是一個外部表,存儲該表的文件也需要額外刪除,3、fact distinct 這一步將數據寫入到HDFS上為建立詞典做準備,這時候也可以刪除了,4、rowKey統計的時候會生成一個文件,此時可以刪除。
?
????????result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
????????inputSide.addStepPhase4_Cleanup(result);
????????outputSide.addStepPhase4_Cleanup(result);????????
?
????????return result;
????}
一、?根據事實表和維表抽取需要的維度和度量,創建一張寬表或平表,并且進行文件再分配
1.1 生成Hive寬表或平表(Create Intermediate Flat Hive Table)(執行Hive命令行)
這一步的操作是根據cube的定義生成原始數據,這里會新創建一個hive外部表,然后再根據cube中定義的星狀模型,查詢出維度(對于DERIVED類型的維度使用的是外鍵列)和度量的值插入到新創建的表中,這個表是一個外部表,表的數據文件(存儲在HDFS)作為下一個子任務的輸入,它首先根據維度中的列和度量中作為參數的列得到需要出現在該表中的列,然后執行三步hive操作,這三步hive操作是通過hive -e的方式執行的shell命令。
??1. drop TABLE IF EXISTS xxx
??2. CREATE EXTERNAL TABLE IF NOT EXISTS xxx() ROW FORMAT DELIMITED FIELDS TERMINATED BY '\177' STORED AS SEQUENCEFILE LOCATION xxxx,其中表名是根據當前的cube名和segment的uuid生成的,location是當前job的臨時文件,只有當insert插入數據的時候才會創建,注意這里每一行的分隔符指定的是'\177'(目前是寫死的,十進制為127)。
??3. 插入數據,在執行之前需要首先設置一些配置項,這些配置項通過hive的SET命令設置,是根據這個cube的job的配置文件(一般是在kylin的conf目錄下)設置的,最后執行的是INSERT OVERWRITE TABLE xxx SELECT xxxx語句,SELECT子句中選出cube星狀模型中事實表與維度表按照設置的方式join之后的出現在維度或者度量參數中的列(特殊處理derived列),然后再加上用戶設置的where條件和partition的時間條件(根據輸入build的參數)。
??需要注意的是這里無論用戶設置了多少維度和度量,每次join都會使用事實表和所有的維度表進行join,這可能造成不必要的性能損失(多一個join會影響hive性能,畢竟要多讀一些文件)。這一步執行完成之后location指定的目錄下就有了原始數據的文件,為接下來的任務提供了輸入。
JoinedFlatTable.generateDropTableStatement(flatDesc);
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
JoinedFlatTable.generateInsertDataStatement(flatDesc);
二、?提取緯度值、創建維度字典和保存統計信息
2.1 提取事實表維度去重值(Extract Fact Table Distinct Columns)(執行一個MapReduce任務,包含抽取緯度值及統計各Mapper間的重復度兩種任務)
????在這一步是根據上一步生成的hive表計算出還表中的每一個出現在事實表中的維度的distinct值,并寫入到文件中,它是啟動一個MR任務完成的,MR任務的輸入是HCatInputFormat,它關聯的表就是上一步創建的臨時表,這個MR任務的map階段首先在setup函數中得到所有維度中出現在事實表的維度列在臨時表的index,根據每一個index得到該列在臨時表中在每一行的值value,然后將<index+value,EMPTY_TEXT>作為mapper的輸出,通過index決定由哪個Reduce處理(而Reduce啟動的時候根據ReduceTaskID如0000,0001來初始化決定處理哪個index對應的維度列),該任務還啟動了一個combiner,它所做的只是對同一個key(維度值)進行去重(同一個mapper的結果),reducer所做的事情也是進行key(維度值)去重(所有mapper的結果),然后在Reduce中將該維度列去重后的維度值一行行的寫入到以列名命名的文件中(注意kylin實現的方式,聚合的key是緯度值,而不是index)。
提取事實表維度列的唯一值是通過FactDistinctColumnsJob這個MapReduce來完成,核心思想是每個Reduce處理一個維度列,然后每個維度列Reduce單獨輸出該維度列對應的去重后的數據文件(output written to baseDir/colName/-r-00000,baseDir/colName2/-r-00001 or 直接輸出字典?output written to baseDir/colName/colName.rldict-r-00000)。另外會輸出各Mapper間重復度統計文件(output written to baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001)
FactDistinctColumnsJob
FactDistinctColumnsMapper
FactDistinctColumnPartitioner
FactDistinctColumnsCombiner
FactDistinctColumnsReducer
org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper
org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer
?
在FactDistinctColumnsMapper中輸出維度值或通過HHL近似算法統計每個Mapper中各個CuboID的去重行數
? ? public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
? ? ? ? Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
? ? ? ? for (String[] row : rowCollection) {
? ? ? ? ? ? context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
? ? ? ? ? ? for (int i = 0; i < allCols.size(); i++) {
? ? ? ? ? ? ? ? String fieldValue = row[columnIndex[i]];
? ? ? ? ? ? ? ? if (fieldValue == null)
? ? ? ? ? ? ? ? ? ? continue;
? ? ? ? ? ? ? ? final DataType type = allCols.get(i).getType();
? ? ? ? ? ? ? ? if (dictColDeduper.isDictCol(i)) {
? ? ? ? ? ? ? ? ? ? if (dictColDeduper.add(i, fieldValue)) {
? ? ? ? ? ? ? ? ? ? ? ? // 輸出維度值,KEY=COLUMN_INDEX+COLUME_VALUE,VALUE=EMPTY_TEXT
? ? ? ? ? ? ? ? ? ? ? ? writeFieldValue(context, type, i, fieldValue);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? DimensionRangeInfo old = dimensionRangeInfoMap.get(i);
? ? ? ? ? ? ? ? ? ? if (old == null) {
? ? ? ? ? ? ? ? ? ? ? ? old = new DimensionRangeInfo(fieldValue, fieldValue);
? ? ? ? ? ? ? ? ? ? ? ? dimensionRangeInfoMap.put(i, old);
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? old.setMax(type.getOrder().max(old.getMax(), fieldValue));
? ? ? ? ? ? ? ? ? ? ? ? old.setMin(type.getOrder().min(old.getMin(), fieldValue));
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? // 抽樣統計,KEY=CUBOID,VALUE=HLLCount
? ? ? ? ? ? if (rowCount % 100 < samplingPercentage) {
? ? ? ? ? ? ? ? putRowKeyToHLL(row);
? ? ? ? ? ? }
? ? ? ? ? ??
? ? ? ? ? ? if (rowCount % 100 == 0) {
? ? ? ? ? ? ? ? dictColDeduper.resetIfShortOfMem();
? ? ? ? ? ? }
? ? ? ? ? ? rowCount++;
? ? ? ? }
? ? }
? ? protected void doCleanup(Context context) throws IOException, InterruptedException {
? ? ? ? ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
? ? ? ? // output each cuboid's hll to reducer, key is 0 - cuboidId
? ? ? ? for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
? ? ? ? ? ? cuboidStatCalculator.waitForCompletion();
? ? ? ? }
? ? ? ? for (CuboidStatCalculator cuboidStatCalculator : cuboidStatCalculators) {
? ? ? ? ? ? Long[] cuboidIds = cuboidStatCalculator.getCuboidIds();
? ? ? ? ? ? HLLCounter[] cuboidsHLL = cuboidStatCalculator.getHLLCounters();
? ? ? ? ? ? HLLCounter hll;
? ? ? ? ? ? // 輸出各個CuboID的去重行數HLLCount
? ? ? ? ? ? for (int i = 0; i < cuboidIds.length; i++) {
? ? ? ? ? ? ? ? hll = cuboidsHLL[i];
? ? ? ? ? ? ? ? tmpbuf.clear();
? ? ? ? ? ? ? ? tmpbuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
? ? ? ? ? ? ? ? tmpbuf.putLong(cuboidIds[i]);
? ? ? ? ? ? ? ? outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
? ? ? ? ? ? ? ? hllBuf.clear();
? ? ? ? ? ? ? ? hll.writeRegisters(hllBuf);
? ? ? ? ? ? ? ? outputValue.set(hllBuf.array(), 0, hllBuf.position());
? ? ? ? ? ? ? ? sortableKey.init(outputKey, (byte) 0);
? ? ? ? ? ? ? ? context.write(sortableKey, outputValue);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? for (Integer colIndex : dimensionRangeInfoMap.keySet()) {
? ? ? ? ? ? DimensionRangeInfo rangeInfo = dimensionRangeInfoMap.get(colIndex);
? ? ? ? ? ? DataType dataType = allCols.get(colIndex).getType();
? ? ? ? ? ? writeFieldValue(context, dataType, colIndex, rangeInfo.getMin());
? ? ? ? ? ? writeFieldValue(context, dataType, colIndex, rangeInfo.getMax());
? ? ? ? }
? ? }
?
在FactDistinctColumnPartitioner中根據SelfDefineSortableKey(COLUMN_INDEX)選擇分區
? ? public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) {
? ? ? ? Text key = skey.getText();
? ? ? ? // 統計任務
? ? ? ? if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) {
? ? ? ? ? ? Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
? ? ? ? ? ? return reducerMapping.getReducerIdForCuboidRowCount(cuboidId);
? ? ? ? } else {
? ? ? ? ? ? // 抽取緯度值任務,直接根據COLUMN_INDEX指定分區
? ? ? ? ? ? return BytesUtil.readUnsigned(key.getBytes(), 0, 1);
? ? ? ? }
? ? }
?
在FactDistinctColumnsReducer中輸出去重后的維度值或輸出通過HLL近似算法統計CuboID去重后的行數
? ? public void doReduce(SelfDefineSortableKey skey, Iterable<Text> values, Context context) throws IOException, InterruptedException {
? ? ? ? Text key = skey.getText();
? ? ? ??
? ? ? ? // 統計邏輯
? ? ? ? if (isStatistics) {
? ? ? ? ? ? // for hll
? ? ? ? ? ? long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG);
? ? ? ? ? ? for (Text value : values) {
? ? ? ? ? ? ? ? HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
? ? ? ? ? ? ? ? ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
? ? ? ? ? ? ? ? hll.readRegisters(bf);
? ? ? ? ? ? ? ? // 累計Mapper輸出的各個CuboID未去重的行數(每個Reduce處理部分CuboIDs)
? ? ? ? ? ? ? ? totalRowsBeforeMerge += hll.getCountEstimate();
? ? ? ? ? ? ? ? if (cuboidId == baseCuboidId) {
? ? ? ? ? ? ? ? ? ? baseCuboidRowCountInMappers.add(hll.getCountEstimate());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? // 合并CuboID
? ? ? ? ? ? ? ? if (cuboidHLLMap.get(cuboidId) != null) {
? ? ? ? ? ? ? ? ? ? cuboidHLLMap.get(cuboidId).merge(hll);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? cuboidHLLMap.put(cuboidId, hll);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? } else {
? ? ? ? ? ? String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
? ? ? ? ? ? logAFewRows(value);
? ? ? ? ? ? // if dimension col, compute max/min value
? ? ? ? ? ? if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
? ? ? ? ? ? ? ? if (minValue == null || col.getType().compare(minValue, value) > 0) {
? ? ? ? ? ? ? ? ? ? minValue = value;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (maxValue == null || col.getType().compare(maxValue, value) < 0) {
? ? ? ? ? ? ? ? ? ? maxValue = value;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //if dict column
? ? ? ? ? ? if (cubeDesc.getAllColumnsNeedDictionaryBuilt().contains(col)) {
? ? ? ? ? ? ? ? if (buildDictInReducer) {
? ? ? ? ? ? ? ? ? ? // 如果需要在Reduce階段構建詞典,則在doCleanup后構建完輸出詞典文件
? ? ? ? ? ? ? ? ? ? // output written to baseDir/colName/colName.rldict-r-00000 (etc)
? ? ? ? ? ? ? ? ? ? builder.addValue(value);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? // 直接輸出去重后的維度值
? ? ? ? ? ? ? ? ? ? byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
? ? ? ? ? ? ? ? ? ? // output written to baseDir/colName/-r-00000 (etc)
? ? ? ? ? ? ? ? ? ? String fileName = col.getIdentity() + "/";
? ? ? ? ? ? ? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_COLUMN, NullWritable.get(), new Text(keyBytes), fileName);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? rowCount++;
? ? }
?
? ? protected void doCleanup(Context context) throws IOException, InterruptedException {
? ? ? ? if (isStatistics) {
? ? ? ? ? ? //output the hll info;
? ? ? ? ? ? List<Long> allCuboids = Lists.newArrayList();
? ? ? ? ? ? allCuboids.addAll(cuboidHLLMap.keySet());
? ? ? ? ? ? Collections.sort(allCuboids);
? ? ? ? ? ? logMapperAndCuboidStatistics(allCuboids); // for human check
? ? ? ? ? ? 輸出通過HLL近似算法統計CuboID去重后的行數
? ? ? ? ? ? outputStatistics(allCuboids);
? ? ? ? } else {
? ? ? ? ? ? //dimension col
? ? ? ? ? ? if (cubeDesc.listDimensionColumnsExcludingDerived(true).contains(col)) {
? ? ? ? ? ? ? ? outputDimRangeInfo();
? ? ? ? ? ? }
? ? ? ? ? ? // dic col
? ? ? ? ? ? if (buildDictInReducer) {
? ? ? ? ? ? ? ? Dictionary<String> dict = builder.build();
? ? ? ? ? ? ? ? outputDict(col, dict);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? mos.close();
? ? }
?
? ? private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
? ? ? ? // output written to baseDir/statistics/statistics-r-00000 (etc)
? ? ? ? String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
? ? ? ? ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
? ? ? ? // 獲取進入這個Reduce各個CuboID去重后的最終統計行數
? ? ? ? // mapper overlap ratio at key -1
? ? ? ? long grandTotal = 0;
? ? ? ? for (HLLCounter hll : cuboidHLLMap.values()) {
? ? ? ? ? ? // 累計各個CuboID去重后的最終統計行數
? ? ? ? ? ? grandTotal += hll.getCountEstimate();
? ? ? ? }
? ? ? ??
? ? ? ? // 輸出進入這個Reduce中的各Mapper間的重復度,totalRowsBeforeMerge / grandTotal
? ? ? ? double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
? ? ? ? // ?Mapper數量
? ? ? ? // mapper number at key -2
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
? ? ? ? // 抽樣百分比
? ? ? ? // sampling percentage at key 0
? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
? ? ? ? // 輸出進入這個Reduce的各個cuboId的最終統計結果
? ? ? ? for (long i : allCuboids) {
? ? ? ? ? ? valueBuf.clear();
? ? ? ? ? ? cuboidHLLMap.get(i).writeRegisters(valueBuf);
? ? ? ? ? ? valueBuf.flip();
? ? ? ? ? ? mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
? ? ? ? }
? ? }
2.2 基于維度去重值構建維度字典(Build Dimension Dictionary)(在kylin進程內的一個線程中去創建所有維度的dictionary)
??這一步是根據上一步生成的distinct column文件和維度表計算出所有維度的詞典信息,詞典是為了節約存儲而設計的,用于將一個成員值編碼成一個整數類型并且可以通過整數值獲取到原始成員值,每一個cuboid的成員是一個key-value形式存儲在hbase中,key是維度成員的組合,但是一般情況下維度是一些字符串之類的值(例如商品名),所以可以通過將每一個維度值轉換成唯一整數而減少內存占用,在從hbase查找出對應的key之后再根據詞典獲取真正的成員值。使用字典的好處是有很好的數據壓縮率,可降低存儲空間,同時也提升存儲讀取的速度。缺點是構建字典需要較多的內存資源,創建維度基數超過千萬的容易造成內存溢出。
???這一步是在kylin進程內的一個線程中執行的,它會創建所有維度的dictionary,如果是事實表上的維度則可以從上一步生成的文件中讀取該列的distinct成員值(FileTable),否則則需要從原始的hive表中讀取每一列的信息(HiveTable),根據不同的源(文件或者hive表)獲取所有的列去重之后的成員列表,然后根據這個列表生成dictionary,kylin中針對不同類型的列使用不同的實現方式,對于time之類的(date、time、dtaetime和timestamp)使用DateStrDictionary,這里目前還存在著一定的問題,因為這種編碼方式會首先將時間轉換成‘yyyy-MM-dd’的格式,會導致timestamp之類的精確時間失去天以后的精度。針對數值型的使用NumberDictionary,其余的都使用一般的TrieDictionary(字典樹)。這些dictionary會作為cube的元數據存儲的kylin元數據庫里面,執行query的時候進行轉換。
???針對這一步需要注意的問題:首先,這一步的兩個步驟都是在kylin進程的一個線程中執行的,第一步會加載某一個維度的所有distinct成員到內存,如果某一個維度的基數比較大 ,可能會導致內存出現OOM,然后在創建snapshotTable的時候會限制原始表的大小不能超過配置的一個上限值,如果超過則會執行失敗。但是應該強調的是這里加載全部的原始維度表更可能出現OOM。
CreateDictionaryJob
2.3 保存統計信息(合并保存統計信息及基于上一個HyperLogLog模擬去重統計信息選擇Cube構建算法等)
???針對上一個MR的HyperLogLog模擬去重統計結果文件baseDir/statistics/statistics-r-00000,baseDir/statistics/statistics-r-00001,合并相關統計信息,根據最終重復度選擇Cube構建算法
在FactDistinctColumnsReducer中輸出進入這個Reduce的各個CuboID的統計信息???
private void outputStatistics(List<Long> allCuboids) throws IOException, InterruptedException {
????????// output written to baseDir/statistics/statistics-r-00000 (etc)
????????String statisticsFileName = BatchConstants.CFG_OUTPUT_STATISTICS + "/" + BatchConstants.CFG_OUTPUT_STATISTICS;
????????ByteBuffer valueBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
????????// 獲取進入這個Reduce各個CuboID去重后的最終統計行數
????????// mapper overlap ratio at key -1
????????long grandTotal = 0;
????????for (HLLCounter hll : cuboidHLLMap.values()) {
????????????// 累計各個CuboID去重后的最終統計行數
????????????grandTotal += hll.getCountEstimate();
????????}
????????// 輸出進入這個Reduce中的各Mapper間的重復度,totalRowsBeforeMerge / grandTotal
????????double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-1), new BytesWritable(Bytes.toBytes(mapperOverlapRatio)), statisticsFileName);
????????//??Mapper數量
????????// mapper number at key -2
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(-2), new BytesWritable(Bytes.toBytes(baseCuboidRowCountInMappers.size())), statisticsFileName);
????????// 抽樣百分比
????????// sampling percentage at key 0
????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(0L), new BytesWritable(Bytes.toBytes(samplingPercentage)), statisticsFileName);
????????// 輸出進入這個Reduce的各個cuboId的最終統計結果
????????for (long i : allCuboids) {
????????????valueBuf.clear();
????????????cuboidHLLMap.get(i).writeRegisters(valueBuf);
????????????valueBuf.flip();
????????????mos.write(BatchConstants.CFG_OUTPUT_STATISTICS, new LongWritable(i), new BytesWritable(valueBuf.array(), valueBuf.limit()), statisticsFileName);
????????}
}
在SaveStatisticsStep保存統計信息任務階段會去讀取上一步任務產出的cuboID統計結果文件,產出最終統計信息保存到元數據引擎中并且根據各個Mapper重復度選擇Cube構建算法。
?Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
?long totalRowsBeforeMerge = 0;
?long grantTotal = 0;
?int samplingPercentage = -1;
?int mapperNumber = -1;
?for (Path item : statisticsFiles) {
?// 讀取解析統計文件
CubeStatsReader.CubeStatsResult cubeStatsResult = new CubeStatsReader.CubeStatsResult(item,
????????????????????????kylinConf.getCubeStatsHLLPrecision());????????????
????????????????// 獲取各個CuboID的計數器
????????????????cuboidHLLMap.putAll(cubeStatsResult.getCounterMap());
????????????????long pGrantTotal = 0L;
????????????????for (HLLCounter hll : cubeStatsResult.getCounterMap().values()) {
????????????????????pGrantTotal += hll.getCountEstimate();
????????????????}????????????????
????????????????// 累計所有Mapper輸出的cuboID行數
????????????????totalRowsBeforeMerge += pGrantTotal * cubeStatsResult.getMapperOverlapRatio();
????????????????// 累計去重后的cuboID統計行數
????????????????grantTotal += pGrantTotal;
????????????double mapperOverlapRatio = grantTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grantTotal;
????????????CubingJob cubingJob = (CubingJob) getManager()
????????????????????.getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
????????????// fact源數據行數
????????????long sourceRecordCount = cubingJob.findSourceRecordCount();
???????????
????????????// 保存CuboID最終統計信息到最終統計文件cuboid_statistics.seq中
????????????// cuboidHLLMap CuboID的統計信息
????????????// samplingPercentage 抽樣百分比
????????????// mapperNumber Mapper數
????????????// mapperOverlapRatio 各個Mapper間的重復度
????????????// sourceRecordCount fact源數據行數
????????????CubeStatsWriter.writeCuboidStatistics(hadoopConf, statisticsDir, cuboidHLLMap, samplingPercentage,mapperNumber, mapperOverlapRatio, sourceRecordCount);
????????????Path statisticsFile = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
????????????logger.info(newSegment + " stats saved to hdfs " + statisticsFile);
????????????FSDataInputStream is = fs.open(statisticsFile);
????????????try {
?
????????????????// put the statistics to metadata store
????????????????// 把統計信息存儲到kylin的元數據引擎中
????????????????String resPath = newSegment.getStatisticsResourcePath();
????????????????rs.putResource(resPath, is, System.currentTimeMillis());
????????????????logger.info(newSegment + " stats saved to resource " + resPath);
????????????????// 根據抽樣數據計算重復度,選擇Cube構建算法,如mapperOverlapRatio > 7 選逐層算法,否則選快速算法
????????????????StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, newSegment);
????????????????StatisticsDecisionUtil.optimizeCubingPlan(newSegment);
????????????} finally {
????????????????IOUtils.closeStream(is);
}
用戶該如何選擇算法呢?無需擔心,Kylin會自動選擇合適的算法。Kylin在計算Cube之前對數據進行采樣,在“fact distinct”步,利用HyperLogLog模擬去重,估算每種組合有多少不同的key,從而計算出每個Mapper輸出的數據大小,以及所有Mapper之間數據的重合度,據此來決定采用哪種算法更優。在對上百個Cube任務的時間做統計分析后,Kylin選擇了7做為默認的算法選擇閥值(參數kylin.cube.algorithm.layer-or-inmem-threshold):如果各個Mapper的小Cube的行數之和,大于reduce后的Cube行數的7倍,采用Layered Cubing, 反之采用Fast Cubing。如果用戶在使用過程中,更傾向于使用Fast Cubing,可以適當調大此參數值,反之調小。
org.apache.kylin.engine.mr.steps.SaveStatisticsStep???????????????
?int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit();
????????????????double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); // 默認7
????????????????logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
????????????????logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is "+ overlapThreshold);
????????????????// in-mem cubing is good when
????????????????// 1) the cluster has enough mapper slots to run in parallel
????????????????// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
????????????????alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)//
????????????????????????? CubingJob.AlgorithmEnum.INMEM ????// 快速算法
????????????????????????: CubingJob.AlgorithmEnum.LAYER; ???// 逐層算法
三、?構建Cube
3.1 計算BaseCuboid文件(Build Base Cuboid Data)(執行一個MapReduce任務)
???何謂Base cuboid呢?假設一個cube包含了四個維度:A/B/C/D,那么這四個維度成員間的所有可能的組合就是base cuboid,這就類似在查詢的時候指定了select count(1) from xxx group by A,B,C,D;這個查詢結果的個數就是base cuboid集合的成員數。這一步也是通過一個MR任務完成的,輸入是臨時表的路徑和分隔符,map對于每一行首先進行split,然后獲取每一個維度列的值組合作為rowKey,但是rowKey并不是簡單的這些維度成員的內容組合,而是首先將這些內容從dictionary中查找出對應的id,然后組合這些id得到rowKey,這樣可以大大縮短hbase的存儲空間,提升查找性能。然后在查找該行中的度量列。這個MR任務還會執行combiner過程,執行邏輯和reducer相同,在reducer中的key是一個rowKey,value是相同的rowKey的measure組合的數組,reducer會分解出每一個measure的值,然后再根據定義該度量使用的聚合函數計算得到這個rowKey的結果,其實這已經類似于hbase存儲的格式了。
org.apache.kylin.engine.mr.steps.BaseCuboidJob
org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
3.2 計算第N層cuboid文件(Build N-Dimension Cuboid Data)(執行N個MapReduce任務)
??這一個流程是由多個步驟的,它是根據維度組合的cuboid的總數決定的,上一層cuboid執行MR任務的輸入是下一層cuboid計算的輸出,由于最底層的cuboid(base)已經計算完成,所以這幾步不需要依賴于任何的hive信息,它的reducer和base cuboid的reducer過程基本一樣的(相同rowkey的measure執行聚合運算),mapper的過程只需要根據這一行輸入的key(例如A、B、C、D中某四個成員的組合)獲取可能的下一層的的組合(例如只有A、B、C和B、C、D),那么只需要將這些可能的組合提取出來作為新的key,value不變進行輸出就可以了。
舉個例子,假設一共四個維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個measure(對于這列V,計算sum(V)),這里忽略dictionary編碼。原始表如下:
A
B
C
D
V
A1
B1
C1
D1
2
A1
B2
C1
D1
3
A2
B1
C1
D1
5
A3
B1
C1
D1
6
A3
B2
C1
D1
8
那么base cuboid最終的輸出如下
(<A1、B1、C1、D1>、2)
(<A1、B2、C1、D1>, 3)
(<A2、B1、C1、D1>, 5)
(<A3、B1、C1、D1>, 6)
(<A3、B2、C1、D1>, 8)
那么它作為下面一個cuboid的輸入,對于第一行輸入
(<A1、B1、C1、D1>, 2),mapper執行完成之后會輸出
(<A1、B1、C1>, 2)、
(<A1、B1、D1>, 2)、
(<A1、C1、D1>, 2)、
(<B1、C1、D1>, 2)這四項,
同樣對于其他的內一行也會輸出四行,最終他們經過reducer的聚合運算,得到如下的結果:
(<A1、B1、C1>, 2)
(<A1、B1、D1>, 2)
(<A1、C1、D1>, 2 + 3)
(<B1、C1、D1>,2 + 5 +6)
???...
這樣一次將下一層的結果作為輸入計算上一層的cuboid成員,直到最頂層的cuboid,這一個層cuboid只包含一個成員,不按照任何維度進行group by。
?????上面的這些步驟用于生成cuboid,假設有N個維度(對于特殊類型的),那么就需要有N +1層cuboid,每一層cuboid可能是由多個維度的組合,但是它包含的維度個數相同。
org.apache.kylin.engine.mr.steps.NDCuboidJob
org.apache.kylin.engine.mr.steps.NDCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
3.3 創建HTable
??在上面幾步中,我們已經將每一層的cuboid計算完成,每一層的cuboid文件都是一些cuboid的集合,每一層的cuboid的key包含相同的維度個數,下面一步就是將這些cuboid文件導入到hbase中,根據上一步計算出的rowKey分布情況(split數組)創建HTable,創建一個HTable的時候還需要考慮一下幾個事情:1、列組的設置,2、每一個列組的壓縮方式,3、部署coprocessor,4、HTable中每一個region的大小。在這一步中,列組的設置是根據用戶創建cube時候設置的,在hbase中存儲的數據key是維度成員的組合,value是對應聚合函數的結果,列組針對的是value的,一般情況下在創建cube的時候只會設置一個列組,該列包含所有的聚合函數的結果;在創建HTable時默認使用LZO壓縮,如果不支持LZO則不進行壓縮,在后面kylin的版本中支持更多的壓縮方式;kylin強依賴于hbase的coprocessor,所以需要在創建HTable為該表部署coprocessor,這個文件會首先上傳到HBase所在的HDFS上,然后在表的元信息中關聯,這一步很容易出現錯誤,例如coprocessor找不到了就會導致整個regionServer無法啟動,所以需要特別小心;region的劃分已經在上一步確定了,所以這里不存在動態擴展的情況,所以kylin創建HTable使用的接口如下:
public void createTable( final HTableDescriptor desc , byte [][] splitKeys)。
CreateHTableJob
3.4 轉換HFile文件
??創建完了HTable之后一般會通過插入接口將數據插入到表中,但是由于cuboid中的數據量巨大,頻繁的插入會對Hbase的性能有非常大的影響,所以kylin采取了首先將cuboid文件轉換成HTable格式的HFile文件,然后在通過bulkLoad的方式將文件和HTable進行關聯,這樣可以大大降低Hbase的負載,這個過程通過一個MR任務完成。
??這個任務的輸入是所有的cuboid文件,在mapper階段根據每一個cuboid成員的key-value輸出,如果cube定義時指定了多個列組,那么同一個key要按照不同列組中的值分別輸出,例如在cuboid文件中存在一行cuboid=1,key=1,value=sum(cost),count(1)的數據,而cube中將這兩個度量劃分到兩個列組中,這時候對于這一行數據,mapper的輸出為<1, sum(cost)>和<1,count(1)>。reducer使用的是org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer,它會按照行排序輸出,如果一行中包含多個值,那么會將這些值進行排序再輸出。輸出的格式則是根據HTable的文件格式定義的。
CubeHFileJob
3.5 BulkLoad文件
??這一步將HFile文件load到HTable中,因為load操作會將原始的文件刪除(相當于remove),在操作之前首先將所有列組的Hfile的權限都設置為777,然后再啟動LoadIncrementalHFiles任務執行load操作,它的輸入為文件的路徑和HTable名,這一步完全依賴于HBase的工具。這一步完成之后,數據已經存儲到HBase中了,key的格式由cuboid編號+每一個成員在字典樹的id組成,value可能保存在多個列組里,包含在原始數據中按照這幾個成員進行GROUP BY計算出的度量的值。
BulkLoadJob
四、?收尾工作
??執行完上一步就已經完成了從輸入到輸出的計算過程,接下來要做的就是一些kylin內部的工作,分別是更新Cube元數據,更新cube狀態,臨時數據清理。
4.1 更新Cube元數據信息
??這一步主要是更新cube的狀態,其中需要更新的包括cube是否可用、以及本次構建的數據統計,包括構建完成的時間,輸入的record數目,輸入數據的大小,保存到Hbase中數據的大小等,并將這些信息持久到元數據庫中。
UpdateCubeInfoAfterBuildStep
4.2 清理臨時數據
??這一步是否成功對正確性不會有任何影響,因為經過上一步之后這個segment就可以在這個cube中被查找到了,但是在整個執行過程中產生了很多的垃圾文件,其中包括:1、臨時的hive表,2、因為hive表是一個外部表,存儲該表的文件也需要額外刪除,3、fact distinct 這一步將數據寫入到HDFS上為建立詞典做準備,這時候也可以刪除了,4、rowKey統計的時候會生成一個文件,此時可以刪除。5、生成HFile時文件存儲的路徑和hbase真正存儲的路徑不同,雖然load是一個remove操作,但是上層的目錄還是存在的,也需要刪除。這一步kylin做的比較簡單,并沒有完全刪除所有的臨時文件,其實在整個計算過程中,真正還需要保留的數據只有多個cuboid文件(需要增量build的cube),這個因為在不同segment進行merge的時候是基于cuboid文件的,而不是根據HTable的。
GarbageCollectionStep
Cuboid 的維度和指標如何轉換為HBase的KV結構
簡單的說Cuboid的維度會映射為HBase的Rowkey,Cuboid的指標會映射為HBase的Value。如下圖所示:?圖2
如上圖原始表所示:Hive表有兩個維度列year和city,有一個指標列price。
如上圖預聚合表所示:我們具體要計算的是year和city這兩個維度所有維度組合(即4個cuboid)下的sum(priece)指標,這個指標的具體計算過程就是由MapReduce完成的。
如上圖字典編碼所示:為了節省存儲資源,Kylin對維度值進行了字典編碼。圖中將beijing和shanghai依次編碼為0和1。
如上圖HBase KV存儲所示:在計算cuboid過程中,會將Hive表的數據轉化為HBase的KV形式。Rowkey的具體格式是cuboid id + 具體的維度值(最新的Rowkey中為了并發查詢還加入了ShardKey),以預聚合表內容的第2行為例,其維度組合是(year,city),所以cuboid id就是00000011,cuboid是8位,具體維度值是1994和shanghai,所以編碼后的維度值對應上圖的字典編碼也是11,所以HBase的Rowkey就是0000001111,對應的HBase Value就是sum(priece)的具體值。
所有的cuboid計算完成后,會將cuboid轉化為HBase的KeyValue格式生成HBase的HFile,最后將HFile load進cube對應的HBase表中。
Cube 構建過程重要源碼分析
1 從Hive表生成Base Cuboid
在實際的cube構建過程中,會首先根據cube的Hive事實表和維表生成一張大寬表,然后計算大寬表列的基數,建立維度字典,估算cuboid的大小,建立cube對應的HBase表,再計算base cuboid。
計算base cuboid就是一個MapReduce作業,其輸入是上面提到的Hive大寬表,輸出的是key是各種維度組合,value是Hive大寬表中指標的值。
org.apache.kylin.engine.mr.steps.BaseCuboidJob
org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
map階段生成key-value的代碼如下:???
public void doMap(KEYIN key, Object value, Context context) throws IOException, InterruptedException {
????????Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(value);
????????for (String[] row: rowCollection) {
????????????try {
????????????????outputKV(row, context);
????????????} catch (Exception ex) {
????????????????handleErrorRecord(row, ex);
????????????}
????????}
?
????}
2 從Base Cuboid 逐層計算 Cuboid(Cube構建算法-逐層算法)
從base cuboid 逐層計算每層的cuboid,也是MapReduce作業,map階段每層維度數依次減少。
org.apache.kylin.engine.mr.steps.NDCuboidJob
org.apache.kylin.engine.mr.steps.NDCuboidMapper
org.apache.kylin.engine.mr.steps.CuboidReducer
????????public void doMap(Text key, Text value, Context context) throws Exception {
????????????long cuboidId = rowKeySplitter.split(key.getBytes());
????????????Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
????????????/**
?????????????* Build N-Dimension Cuboid
??????????????## 構建N維cuboid
??????????????這些步驟是“逐層”構建cube的過程,每一步以前一步的輸出作為輸入,然后去掉一個維度以聚合得到一個子cuboid。舉個例子,cuboid ABCD去掉A得到BCD,去掉B得到ACD。
??????????????有些cuboid可以從一個以上的父cuboid聚合得到,這種情況下,Kylin會選擇最小的一個父cuboid。舉例,AB可以從ABC(id:1110)和ABD(id:1101)生成,則ABD會被選中,因為它的比ABC要小。
??????????????在這基礎上,如果D的基數較小,聚合運算的成本就會比較低。所以,當設計rowkey序列的時候,請記得將基數較小的維度放在末尾。這樣不僅有利于cube構建,而且有助于cube查詢,因為預聚合也遵循相同的規則。
??????????????通常來說,從N維到(N/2)維的構建比較慢,因為這是cuboid數量爆炸性增長的階段:N維有1個cuboid,(N-1)維有N個cuboid,(N-2)維有(N-2)*(N-1)個cuboid,以此類推。經過(N/2)維構建的步驟,整個構建任務會逐漸變快。
?????????????*/
????????????Collection<Long> myChildren = cuboidScheduler.getSpanningCuboid(cuboidId);
????????????// if still empty or null
????????????if (myChildren == null || myChildren.size() == 0) {
????????????????context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Skipped records").increment(1L);
????????????????if (skipCounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????????logger.info("Skipping record with ordinal: " + skipCounter);
????????????????}
????????????????return;
????????????}???????????
????????????context.getCounter(BatchConstants.MAPREDUCE_COUNTER_GROUP_NAME, "Processed records").increment(1L);
????????????Pair<Integer, ByteArray> result;
????????????for (Long child : myChildren) {
????????????????Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
????????????????result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
????????????????outputKey.set(result.getSecond().array(), 0, result.getFirst());
????????????????context.write(outputKey, value);
????????????}?????????
????????}
從base cuboid 逐層計算每層的cuboid,也是MapReduce作業,map階段每層維度數依次減少,reduce階段對指標進行聚合。
org.apache.kylin.engine.mr.steps.CuboidReducer
????public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
????????aggs.reset();??//MeasureAggregators 根據每種指標的不同類型對指標進行聚合
????????for (Text value : values) {
????????????codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
????????????if (cuboidLevel > 0) { // Base Cuboid 的 cuboidLevel 是0
????????????????aggs.aggregate(input, needAggr); //指標進行進一步聚合
????????????} else {
????????????????aggs.aggregate(input);
????????????}
????????}
????????aggs.collectStates(result);
????????ByteBuffer valueBuf = codec.encode(result);
????????outputValue.set(valueBuf.array(), 0, valueBuf.position());
????????context.write(key, outputValue);
}?
3 讀取Hive寬表直接在Mapper端預聚合構建完整Cube(Cube構建算法-快速算法)
快速算法的核心思想是清晰簡單的,就是最大化利用Mapper端的CPU和內存,對分配的數據塊,將需要的組合全都做計算后再輸出給Reducer;由Reducer再做一次合并(merge),從而計算出完整數據的所有組合。如此,經過一輪Map-Reduce就完成了以前需要N輪的Cube計算。本質就是在Mapper端基于內存提前做預聚合。
org.apache.kylin.engine.mr.steps.InMemCuboidJob
org.apache.kylin.engine.mr.steps.InMemCuboidMapper
org.apache.kylin.engine.mr.steps.InMemCuboidReducer
map階段生成key-value的代碼如下:
????public void doMap(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException {
????????// put each row to the queue
????????T row = getRecordFromKeyValue(key, value);
????????if (offer(context, row, 1, TimeUnit.MINUTES, 60)) {
????????????counter++;
????????????countOfLastSplit++;
????????????if (counter % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????logger.info("Handled " + counter + " records, internal queue size = " + queue.size());
????????????}
????????} else {
????????????throw new IOException("Failed to offer row to internal queue due to queue full!");
????????}
????????if (counter % unitRows == 0 && shouldCutSplit(nSplit, countOfLastSplit)) {
????????????if (offer(context, inputConverterUnit.getCutRow(), 1, TimeUnit.MINUTES, 60)) {
????????????????countOfLastSplit = 0;
????????????} else {
????????????????throw new IOException("Failed to offer row to internal queue due to queue full!");
????????????}
????????????nSplit++;
????????}
}
?
reduce階段整體合并的代碼如下:
????public void doReduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
????????aggs.reset();
????????for (ByteArrayWritable value : values) {
????????????if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
????????????????logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
????????????}
????????????codec.decode(value.asBuffer(), input);
????????????aggs.aggregate(input);
????????}
????????aggs.collectStates(result);
????????// output key
????????outputKey.set(key.array(), key.offset(), key.length());
????????// output value
????????ByteBuffer valueBuf = codec.encode(result);
????????outputValue.set(valueBuf.array(), 0, valueBuf.position());
????????context.write(outputKey, outputValue);
????}
4 Cuboid 轉化為HBase的HFile。
主要就是數據格式的轉化。詳情請參考:?Hive 數據 bulkload 導入 HBase
不同類型的指標是如何進行聚合的
每種不同的指標都會有對應的聚合算法,所有指標聚合的基類是org.apache.kylin.measure.MeasureAggregator。其核心方法如下:???
? ? abstract public void reset();
????//不同類型的指標算法會實現該方法
????abstract public void aggregate(V value);
????abstract public V getState();
?
以最簡單的long類型的sum指標為例:???
public class LongSumAggregator extends MeasureAggregator<LongMutable> {
????????LongMutable sum = new LongMutable();
????????@Override
????????public void reset() {
????????????sum.set(0);
????????}
????????@Override
????????public void aggregate(LongMutable value) {
????????????sum.set(sum.get() + value.get());
????????}
????????@Override
????????public LongMutable getState() {
????????????return sum;
????????}
}
SQL查詢是如何轉化為HBase的Scan操作的
還是以圖2舉例,假設查詢SQL如下:???
select year, sum(price)
from table
where city = "beijing"
group by year
這個SQL涉及維度year和city,所以其對應的cuboid是00000011,又因為city的值是確定的beijing,所以在Scan HBase時就會Scan Rowkey以00000011開頭且city的值是beijing的行,取到對應指標sum(price)的值,返回給用戶。
總結
本文主要介紹了Apache Kylin是如何將Hive表中的數據轉化為HBase的KV結構,并簡單介紹了Kylin的SQL查詢是如何轉化為HBase的Scan操作。希望對大家有所幫助。
---------------------?
Kylin三大引擎和Cube構建源碼解析
????最近在工作中用到了kylin,相關資料還不是很多,關于源碼的更是少之又少,于是結合《kylin權威指南》、《基于Apache Kylin構建大數據分析平臺》、相關技術博客和自己對部分源碼的理解進行了整理。
一、工作原理
每一個Cube都可以設定自己的數據源、計算引擎和存儲引擎,這些設定信息均保存在Cube的元數據中。在構建Cube時,首先由工廠類創建數據源、計算引擎和存儲引擎對象。這三個對象獨立創建,相互之間沒有關聯。
要把它們串聯起來,使用的是適配器模式。計算引擎好比是一塊主板,主控整個Cube的構建過程。它以數據源為輸入,以存儲為Cube的輸出,因此也定義了IN和OUT兩個接口。數據源和存儲引擎則需要適配IN和OUT,提供相應的接口實現,把自己接入計算引擎,適配過程見下圖。適配完成之后,數據源和存儲引擎即可被計算引擎調用。三大引擎連通,就能協同完成Cube構建。
計算引擎只提出接口需求,每個接口都可以有多種實現,也就是能接入多種不同的數據源和存儲。類似的,每個數據源和存儲也可以實現多個接口,適配到多種不同的計算引擎上。三者之間是多對多的關系,可以任意組合,十分靈活。
二、三大主要接口
(一)數據源接口ISource
·adaptToBuildEngine:適配指定的構建引擎接口。返回一個對象,實現指定的IN接口。該接口主要由計算引擎調用,要求數據源向計算引擎適配。如果數據源無法提供指定接口的實現,則適配失敗,Cube構建將無法進行。
·createReadableTable:返回一個ReadableTable,用來順序讀取一個表。除了計算引擎之外,有時也會調用此方法順序訪問數據維表的內容,用來創建維度字典或維表快照。
(二)存儲引擎接口IStorage
·adaptToBuildEngine:適配指定的構建引擎接口。返回一個對象,實現指定的OUT接口。該接口主要由計算引擎調用,要求存儲引擎向計算引擎適配。如果存儲引擎無法提供指定接口的實現,則適配失敗,Cube構建無法進行。
·createQuery:創建一個查詢對象IStorageQuery,用來查詢給定的IRealization。簡單來說,就是返回一個能夠查詢指定Cube的對象。IRealization是在Cube之上的一個抽象。其主要的實現就是Cube,此外還有被稱為Hybrid的聯合Cube。
(三)計算引擎接口IBatchCubingEngine
·createBatchCubingJob:返回一個工作流計劃,用以構建指定的CubeSegment。這里的CubeSegment是一個剛完成初始化,但還不包含數據的CubeSegment。返回的DefaultChainedExecutable是一個工作流的描述對象。它將被保存并由工作流引擎在稍后調度執行,從而完成Cube的構建。
·createBatchMergeJob:返回一個工作流計劃,用以合并指定的CubeSegment。這里的CubeSegment是一個待合并的CubeSegment,它的區間橫跨了多個現有的CubeSegment。返回的工作流計劃一樣會在稍后被調度執行,執行的過程會將多個現有的CubeSegment合并為一個,從而降低Cube的碎片化成都。
·getSourceInterface:指明該計算引擎的IN接口。
·getStorageInterface:指明該計算引擎的OUT接口。
三、三大引擎互動過程
1.Rest API接收到構建(合并)CubeSegment的請求。
2.EngineFactory根據Cube元數據的定義,創建IBatchCubingEngine對象,并調用其上的createBatchCubingJob(或者createBatchMergeJob)方法。
3.IBatchCubingEngine根據Cube元數據的定義,通過SourceFactory和StorageFactory創建出相應的數據源ISource和存儲IStorage對象。
4.IBatchCubingEngine調用ISource上的adaptToBuildEngine方法傳入IN接口,要求數據源向自己適配。
5.IBatchCubingEngine調用IStorage上的adaptToBuildEngine方法,傳入OUT接口,要求存儲引擎向自己適配。
6.適配成功后,計算引擎協同數據源和存儲引擎計劃Cube構建的具體步驟,將結果以工作流的形式返回。
7.執行引擎將在稍后執行工作流,完成Cube構建。
四、Kylin默認三大引擎Hive+MapReduce+HBase的介紹和代碼實現
(一)構建引擎MapReduce
每一個構建引擎必須實現接口IBatchCubingEngine,并在EngineFactory中注冊實現類。只有這樣才能在Cube元數據中引用該引擎,否則會在構建Cube時出現“找不到實現”的錯誤。
注冊的方法是通過kylin.properties來完成的。在其中添加一行構建引擎的聲明。比如:
EngineFactory在啟動時會讀取kylin.properties,默認引擎即為標號2的MRBatchCubingEngine2這個引擎。
1.MRBatchCubingEngine2
這是一個入口類,構建Cube的主要邏輯都封裝在BatchCubingJobBuilder2和BatchMergeJobBuilder2中。其中的DefaultChainedExecutable,代表了一種可執行的對象,其中包含了很多子任務。它執行的過程就是一次串行執行每一個子任務,直到所有子任務都完成。kylin的構建比較復雜,要執行很多步驟,步驟之間有直接的依賴性和順序性。DefaultChainedExecutable很好地抽象了這種連續依次執行的模型,可以用來表示Cube的構建的工作流。
另外,重要的輸入輸出接口也在這里進行聲明。IMRInput是IN接口,由數據源適配實現;IMROutput2是OUT接口,由存儲引擎適配實現。
2.BatchCubingJobBuilder2
BatchCubingJobBuilder2和BatchMergeJobBuilder2大同小異,這里以BatchCubingJobBuilder2為例。
BatchCubingJobBuilder2中的成員變量IMRBatchCubingInputSide inputSide和IMRBatchCubingOutputSide2 outputSide分別來自數據源接口IMRInput和存儲接口IMROutput2,分別代表著輸入和輸出兩端參與創建工作流。
BatchCubingJobBuilder2的主體函數build()中,整個Cube構建過程是一個子任務一次串行執行的過程,這些子任務又被分為四個階段。
第一階段:創建平表。
這一階段的主要任務是預計算連接運算符,把事實表和維表連接為一張大表,也稱為平表。這部分工作可通過調用數據源接口來完成,因為數據源一般有現成的計算表連接方法,高效且方便,沒有必要在計算引擎中重復實現。
第二階段:創建字典。
創建字典由三個子任務完成,由MR引擎完成,分別是抽取列值、創建字典和保存統計信息。是否使用字典是構建引擎的選擇,使用字典的好處是有很好的數據壓縮率,可降低存儲空間,同時也提升存儲讀取的速度。缺點是構建字典需要較多的內存資源,創建維度基數超過千萬的容易造成內存溢出。
第三階段:構建Cube。
其中包含兩種構建cube的算法,分別是分層構建和快速構建。對于不同的數據分布來說它們各有優劣,區別主要在于數據通過網絡洗牌的策略不同。兩種算法的子任務將全部被加入工作流計劃中,在執行時會根據源數據的統計信息自動選擇一種算法,未被選擇的算法的子任務將被自動跳過。在構建cube的最后還將調用存儲引擎的接口,存儲引擎負責將計算完的cube放入引擎。
第四階段:更新元數據和清理。
最后階段,cube已經構建完畢,MR引擎將首先添加子任務更新cube元數據,然后分別調用數據源接口和存儲引擎接口對臨時數據進行清理。
3.IMRInput
這是BatchCubingJobBuilder2對數據源的要求,所有希望接入MRBatchCubingEngine2的數據源都必須實現該接口。
·getTableInputFormat方法返回一個IMRTableInputFormat對象,用以幫助MR任務從數據源中讀取指定的關系表,為了適應MR編程接口,其中又有兩個方法,configureJob在啟動MR任務前被調用,負責配置所需的InputFormat,連接數據源中的關系表。由于不同的InputFormat所讀入的對象類型各不相同,為了使得構建引擎能夠統一處理,因此又引入了parseMapperInput方法,對Mapper的每一行輸入都會調用該方法一次,該方法的輸入是Mapper的輸入,具體類型取決于InputFormat,輸出為統一的字符串數組,每列為一個元素。整體表示關系表中的一行。這樣Mapper救能遍歷數據源中的表了。
·getBatchCubingInputSide方法返回一個IMRBatchCubingInputSide對象,參與創建一個CubeSegment的構建工作流,它內部包含三個方法,addStepPhase1_CreateFlatTable()方法由構建引擎調用,要求數據源在工作流中添加步驟完成平表的創建;getFlatTableInputFormat()方法幫助MR任務讀取之前創建的平表;addStepPhase4_Cleanup()是進行清理收尾,清除已經沒用的平表和其它臨時對象,這三個方法將由構建引擎依次調用。
4.IMROutput2
這是BatchCubingJobBuilder2對存儲引擎的要求,所有希望接入BatchCubingJobBuilder2的存儲都必須實現該接口。
IMRBatchCubingOutputSide2代表存儲引擎配合構建引擎創建工作流計劃,該接口的內容如下:
·addStepPhase2_BuildDictionary:由構建引擎在字典創建后調用。存儲引擎可以借此機會在工作流中添加步驟完成存儲端的初始化或準備工作。
·addStepPhase3_BuildCube:由構建引擎在Cube計算完畢之后調用,通知存儲引擎保存CubeSegment的內容。每個構建引擎計算Cube的方法和結果的存儲格式可能都會有所不同。存儲引擎必須依照數據接口的協議讀取CubeSegment的內容,并加以保存。
·addStepPhase4_Cleanup:由構建引擎在最后清理階段調用,給存儲引擎清理臨時垃圾和回收資源的機會。
(二)數據源Hive
Hive是kylin的默認數據源,由于數據源的實現依賴構建引擎對輸入接口的定義,因此本節的具體內容只適用于MR引擎。
數據源HiveSource首先要實現ISource接口。
HiveSource實現了ISource接口中的方法。adaptToBuildEngine()只能適配IMRInput,返回HiveMRInput實例。另一個方法createReadableTable()返回一個ReadableTable對象,提供讀取一張hive表的能力。
HiveMRInput
HiveMRInput實現了IMRInput接口,實現了它的兩個方法。
一是HiveTableInputFormat實現了IMRTableInputFormat接口,它主要使用了HCatInputFormat作為MapReduce的輸入格式,用通用的方式讀取所有類型的Hive表。Mapper輸入對象為DefaultHCatRecord,統一轉換為String[]后交由構建引擎處理。
二是BatchCubingInputSide實現了IMRBatchCubingInputSide接口。主要實現了在構建的第一階段創建平表的步驟。首先用count(*)查詢獲取Hive平表的總行數,然后用第二句HQL創建Hive平表,同時添加參數根據總行數分配Reducer數目。
(三)存儲引擎HBase
存儲引擎HBaseStorage實現了IStorage接口。
·createQuery方法,返回指定IRealization(數據索引實現)的一個查詢對象。因為HBase存儲是為Cube定制的,所以只支持Cube類型的數據索引。具體的IStorageQuery實現應根據存儲引擎的版本而有所不同。
·adaptToBuildEngine方法,適配IMROutput2的輸出接口。
HBaseMROutput2
觀察IMRBatchCubingOutputSide2的實現。它在兩個時間點參與Cube構建的工作流。一是在字典創建之后(Cube構造之前),在addStepPhase2_BuildDictionary()中添加了“創建HTable”這一步,估算最終CubeSegment的大小,并以此來切分HTable Regions,創建HTable。
第二個插入點是在Cube計算完畢之后,由構建引擎調用addStepPhase3_BuildCube()。這里要將Cube保存為HTable,實現分為“轉換HFile”和“批量導入到HTable”兩步。因為直接插入HTable比較緩慢,為了最快速地將數據導入到HTable,采取了Bulk Load的方法。先用一輪MapReduce將Cube數據轉換為HBase的存儲文件格式HFile,然后就可以直接將HFile導入空的HTable中,完成數據導入。
最后一個插入點是addStepPhase4_Cleanup()是空實現,對于HBase存儲來說沒有需要清理的資源。
五、CubingJob的構建過程
在Kylin構建CubeSegment的過程中,計算引擎居于主導地位,通過它來協調數據源和存儲引擎。
在網頁上向Kylin服務端發送構建新的CubeSegment的請求后,通過controller層來到service層,進入JobService類中的submitJob方法,方法內部再調用submitJobInternal方法,在build、merge和refresh的時候,通過EngineFactory.createBatchCubingJob(newSeg, submitter)返回一個job實例,從這里可以看出,CubingJob的構建入口是由計算引擎提供的,即默認的計算引擎MRBatchCubingEngine2。
Kylin所支持的所有計算引擎,都會在EngineFactory中注冊,并保存在batchEngine中,可以通過配置文件kylin.properties選擇計算引擎,目前Kylin支持的計算引擎有:
MRBatchCubingEngine2實現了createBatchCubingJob方法,方法內調用了BatchCubingJobBuild2的build方法。
在new的初始化過程中,super(newSegment,submitter)就是執行父類的構造方法,進行了一些屬性的初始化賦值,其中的inputSide和outputSide就上上文提到的數據源和存儲引擎實例,通過計算引擎的協調來進行CubingJob的構建。
數據源inputSide實例獲取:
以上即為數據源實例獲取過程的代碼展現,BatchCubingJobBuilder2初始化的時候,調用MRUtil的getBatchCubingInputSide方法,它最終調用的其實還是MRBatchCubingEngine2這個計算引擎的getJoinedFlatTableDesc方法,它返回了一個IJoinedFlatTableDesc實例,這個對象就是對數據源表信息的封裝。獲得了這個flatDesc實例之后,就要來獲取inputSide實例,與獲取計算引擎代碼類似,目前kylin中支持的數據源有:
Kylin默認的數據源是序號為0的HiveSource,所以最后調用的是HiveSource的adaptToBuildEngine,根據傳入的IMRInput.class接口,最終返回得到HiveMRInput的實例,最后再通過它的getBatchCubingInputSide的方法獲取inputSide的實例。
存儲引擎outputSide實例獲取:
以上即為存儲引擎實例獲取的代碼展現,BatchCubingJobBuilder2初始化的時候,調用MRUtil的getBatchCubingOutputSide方法,方法內先調用了StorageFactory類的createEngineAdapter方法,方法內又調用實現了Storage接口的HBaseStorage類的adaptToBuildEngine方法,最后返回了HBaseMROutput2Transition實例,然后在通過它的getBatchCubingOutputSide方法就可以獲取到outputSide的實例。目前kylin中支持的數據源有:
kylin默認的存儲引擎是HBase。
——————————————————————————————————
通過構造函數,數據源、計算引擎和數據存儲三個模塊已經關聯到一起了,上文介紹到的MRBatchCubingEngine2的方法中,在new出了一個BatchCubingJobBuild2實例后,接著就調用了build方法,最后返回了一個CubingJob實例。build方法邏輯如下:
方法的內容就是構建一個CubeSegment的步驟,依次順序的加入到CubingJob的任務list中。
從第一行開始,調用了CubingJob的createBuildJob方法,里面又調用了initCubingJob方法。
initCubingJob方法就是獲取到cube相關的一些配置信息進行初始化,它是根據cube的名字去查詢所在的project,如果不同的project下創建了相同名字的cube,那返回的就會是一個List,然后看配置文件中是否開啟了允許cube重名,如不允許則直接拋出異常,如果允許就在設置projectName時取返回List中的第一個元素,那么這里就可能導致projectName設置錯誤,所以最好保證cube的name是全局唯一的。
在CubingJob初始化之后,會獲取cuboidRootPath,獲取邏輯如下:
經過一連串的調用拼裝,最終獲取的路徑格式如下:
hdfs:///kylin/kylin_metadata/kylin-jobId/cubeName/cuboid
接下來就是三大引擎相互協作,構建CubeSegment的過程,整個過程大致分為創建hive平表、創建字典、構建Cube和更新元數據和清理這四個步驟。
第一步和第四步是由數據源來實現的,具體是在HiveMRInput類實現了IMRInput接口的getBatchCubingInputSide方法中,它返回了一個BatchCubingInputSide實例,在這個類中完成了具體工作;第二步是由計算引擎實現的,依靠JobBuilderSupport類中的方法完成;第三步是由計算引擎和存儲引擎共同完成的,包括構建cube和存儲到HBase;第四步是由數據源和存儲引擎分別完成的;我們按步驟對代碼進行分析。
首先是第一步創建hive平表調用了HiveMRInput類中的靜態內部類BatchCubingInputSide中的addStepPhase1_CreateFlatTable方法。
先獲取cubeName、cubeConfig、hive命令(USE faltTableDatabase)三個變量。
接下來的方法就是抽取變量,進行hive命令的拼接,完成以下步驟:
一是從hive表中,將所需字段從事實表和維表中提取出來,構建一個寬表;
二是將上一步得到的寬表,按照某個字段進行重新分配,如果沒有指定字段,則隨機,目的是產生多個差不多大小的文件,作為后續構建任務的輸入,防止數據傾斜。
三是將hive中的視圖物化。
——————————————————————————————————
創建平表命令例子:
hive -e "USE default;
DROP TABLE IF EXISTS kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d;
CREATE EXTERNAL TABLE IF NOT EXISTS kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d
(
TACONFIRM_BUSINESSCODE string
,TACONFIRM_FUNDCODE string
,TACONFIRM_SHARETYPE string
,TACONFIRM_NETCODE string
,TACONFIRM_CURRENCYTYPE string
,TACONFIRM_CODEOFTARGETFUND string
,TACONFIRM_TARGETSHARETYPE string
,TACONFIRM_TARGETBRANCHCODE string
,TACONFIRM_RETURNCODE string
,TACONFIRM_DEFDIVIDENDMETHOD string
,TACONFIRM_FROZENCAUSE string
,TACONFIRM_TAINTERNALCODE string
,TACONFIRM_C_PROVICE string
,TAPROVINCE_PROVINCENAME string
,TASHARETYPE_SHARETYPENAME string
)
STORED AS SEQUENCEFILE
LOCATION 'hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d';
ALTER TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SET TBLPROPERTIES('auto.purge'='true');
INSERT OVERWRITE TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SELECT
TACONFIRM.BUSINESSCODE as TACONFIRM_BUSINESSCODE
,TACONFIRM.FUNDCODE as TACONFIRM_FUNDCODE
,TACONFIRM.SHARETYPE as TACONFIRM_SHARETYPE
,TACONFIRM.NETCODE as TACONFIRM_NETCODE
,TACONFIRM.CURRENCYTYPE as TACONFIRM_CURRENCYTYPE
,TACONFIRM.CODEOFTARGETFUND as TACONFIRM_CODEOFTARGETFUND
,TACONFIRM.TARGETSHARETYPE as TACONFIRM_TARGETSHARETYPE
,TACONFIRM.TARGETBRANCHCODE as TACONFIRM_TARGETBRANCHCODE
,TACONFIRM.RETURNCODE as TACONFIRM_RETURNCODE
,TACONFIRM.DEFDIVIDENDMETHOD as TACONFIRM_DEFDIVIDENDMETHOD
,TACONFIRM.FROZENCAUSE as TACONFIRM_FROZENCAUSE
,TACONFIRM.TAINTERNALCODE as TACONFIRM_TAINTERNALCODE
,TACONFIRM.C_PROVICE as TACONFIRM_C_PROVICE
,TAPROVINCE.PROVINCENAME as TAPROVINCE_PROVINCENAME
,TASHARETYPE.SHARETYPENAME as TASHARETYPE_SHARETYPENAME
FROM DEFAULT.TACONFIRM as TACONFIRM?
INNER JOIN DEFAULT.TAPROVINCE as TAPROVINCE
ON TACONFIRM.C_PROVICE = TAPROVINCE.C_PROVICE
INNER JOIN DEFAULT.TASHARETYPE as TASHARETYPE
ON TACONFIRM.SHARETYPE = TASHARETYPE.SHARETYPE
WHERE 1=1;
" --hiveconf hive.merge.mapredfiles=false --hiveconf hive.auto.convert.join=true --hiveconf dfs.replication=2 --hiveconf hive.exec.compress.output=true --hiveconf hive.auto.convert.join.noconditionaltask=true --hiveconf mapreduce.job.split.metainfo.maxsize=-1 --hiveconf hive.merge.mapfiles=false --hiveconf hive.auto.convert.join.noconditionaltask.size=100000000 --hiveconf hive.stats.autogather=true
——————————————————————————————————
文件再分配和視圖物化命令例子:
hive -e "USE default;
set mapreduce.job.reduces=3;
set hive.merge.mapredfiles=false;
INSERT OVERWRITE TABLE kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d SELECT * FROM kylin_intermediate_taconfirm_kylin_15all_ddacfb18_3d2e_4e1b_8975_f0871183418d DISTRIBUTE BY RAND();
" --hiveconf hive.merge.mapredfiles=false --hiveconf hive.auto.convert.join=true --hiveconf dfs.replication=2 --hiveconf hive.exec.compress.output=true --hiveconf hive.auto.convert.join.noconditionaltask=true --hiveconf mapreduce.job.split.metainfo.maxsize=-1 --hiveconf hive.merge.mapfiles=false --hiveconf hive.auto.convert.join.noconditionaltask.size=100000000 --hiveconf hive.stats.autogather=true
——————————————————————————————————
創建字典由三個子任務完成,分別是抽取列值、創建字典和保存統計信息,由MR引擎完成,所以直接在build方法中add到任務list中。是否使用字典是構建引擎的選擇,使用字典的好處是有很好的數據壓縮率,可降低存儲空間,同時也提升存儲讀取的速度。缺點是構建字典需要較多的內存資源,創建維度基數超過千萬的容易造成內存溢出。在這個過程最后,還要創建HTable,這屬于存儲引擎的任務,所以是在HBaseMROutput2Transition實例中完成的。
——————————————————————————————————
抽取列值步驟參數例子:
?-conf /usr/local/apps/apache-kylin-2.3.1-bin/conf/kylin_job_conf.xml -cubename Taconfirm_kylin_15all -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -statisticsoutput hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns/statistics -statisticssamplingpercent 100 -jobname Kylin_Fact_Distinct_Columns_Taconfirm_kylin_15all_Step -cubingJobId 4c5d4bb4-791f-4ec3-b3d7-89780adc3f58
——————————————————————————————————
?構建維度字典步驟參數例子 :
?-cubename Taconfirm_kylin_15all -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/fact_distinct_columns -dictPath hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/dict
——————————————————————————————————
創建HTable步驟參數例子:
?-cubename Taconfirm_kylin_15all -segmentid ddacfb18-3d2e-4e1b-8975-f0871183418d -partitions hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-4c5d4bb4-791f-4ec3-b3d7-89780adc3f58/Taconfirm_kylin_15all/rowkey_stats/part-r-00000 -cuboidMode CURRENT
——————————————————————————————————
構建Cube屬于計算引擎的任務,就是根據準備好的數據,依次產生cuboid的數據,在這里調用了兩種構建方法,分別是分層構建和快速構建,但最終只會選擇一種構建方法,分層構建首先調用createBaseCuboidStep方法,生成Base Cuboid數據文件,然后進入for循環,調用createNDimensionCuboidStep方法,根據Base Cuboid計算N層Cuboid數據。
在Cuboid的數據都產生好之后,還需要放到存儲層中,所以接下來調用outputSide實例的addStepPhase3_BuildCube方法,HBaseMROutput2Transition類中的addStepPhase3_BuildCube方法主要有兩步,一是createConvertCuboidToHfileStep方法,將計算引擎產生的cuboid數據轉換成HBase要求的HFile格式,二是createBulkLoadStep方法,即把HFIle數據加載到HBase中。
——————————————————————————————————
構建Base Cuboid步驟參數例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -segmentid 392634bd-4964-428c-a905-9bbf28884452 -input FLAT_TABLE -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_base_cuboid -jobname Kylin_Base_Cuboid_Builder_kylin_sales_cube -level 0 -cubingJobId 6f3c2a9e-7283-4d87-9487-a5ebaffef811
——————————————————————————————————
構建N層Cuboid步驟參數例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -segmentid 392634bd-4964-428c-a905-9bbf28884452 -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_1_cuboid -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/level_2_cuboid -jobname Kylin_ND-Cuboid_Builder_kylin_sales_cube_Step -level 2 -cubingJobId 6f3c2a9e-7283-4d87-9487-a5ebaffef811
——————————————————————————————————
轉換HFile格式步驟參數例子:
?-conf /usr/local/apps/kylin/conf/kylin_job_conf.xml -cubename kylin_sales_cube -partitions hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/rowkey_stats/part-r-00000_hfile -input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/cuboid/* -output hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/hfile -htablename KYLIN_O2SYZPV449 -jobname Kylin_HFile_Generator_kylin_sales_cube_Step
——————————————————————————————————
加載HFile到HBase步驟參數例子:
?-input hdfs://qtbj-sj-cdh-name:8020/kylin/kylin_metadata/kylin-6f3c2a9e-7283-4d87-9487-a5ebaffef811/kylin_sales_cube/hfile -htablename KYLIN_O2SYZPV449 -cubename kylin_sales_cube
——————————————————————————————————
最后一步就是一些收尾工作,包括更新Cube元數據信息,調用inputSide和outputSide實例進行中間臨時數據的清理工作。
完成所有步驟之后,就回到了JobService的submitJob方法中,在得到CubingJob的實例后,會執行以上代碼。這里做的是將CubingJob的信息物化到HBase的kylin_metadata表中,并沒有真正的提交執行。
真正執行CubingJob的地方是在DefaultScheduler,它里面有一個線程會每隔一分鐘,就去HBase的kylin_metadata表中掃一遍所有的CubingJob,然后將需要執行的job,提交到線程池執行。
kylin中任務的構建和執行是異步的。單個kylin節點有query、job和all三種角色,query只提供查詢服務,job只提供真正的構建服務,all則兼具前兩者功能。實際操作中kylin的三種角色節點都可以進行CubingJob的構建,但只有all和job模式的節點可以通過DefaultScheduler進行調度執行
---------------------?
總目錄
Kylin系列(一)—— 入門?
Kylin系列(二)—— Cube 構造算法
總目錄
Kylin cube 構造算法
逐層算法(layer Cubing)
算法的優點
算法的缺點
快速Cube算法(Fast Cubing)
舉個例子
子立方體生成樹(Cuboid spanning Tree)的遍歷次序
優點
缺點
By-layer Spark Cubing算法
改進
Spark中Cubing的過程
性能測試
Kylin cube 構造算法
逐層算法(layer Cubing)
我們知道,一個N維的Cube,是有1個N維子立方體、N個(N-1)維子立方體、N*(N-1)/2個(N-2)維子立方體、……、N個1維子立方體和1個0維子立方體構成,總共有2^N個子立方體。在逐層算法中,按照維度數逐層減少來計算,每個層級的計算(除了第一層,他是從原始數據聚合而來),是基于他上一層級的計算結果來計算的。
比如group by [A,B]的結果,可以基于group by [A,B,C]的結果,通過去掉C后聚合得來的,這樣可以減少重復計算;當0維Cuboid計算出來的時候,整個Cube的計算也就完成了。
如上圖所示,展示了一個4維的Cube構建過程。
此算法的Mapper和Reducer都比較簡單。Mapper以上一層Cuboid的結果(key-value對)作為輸入。由于Key是由各維度值拼接在一起,從其中找出要聚合的維度,去掉它的值成新的key,并對value進行操作,然后把新的key和value輸出,進而Hadoop MapReduce對所有新的key進行排序、洗牌(shuffle)、再送到Reducer處;Reducer的輸入會是一組具有相同key的value集合,對這些value做聚合運算,再結合key輸出就完成了一輪計算。
舉個例子:?
假設一共四個維度A/B/C/D,他們的成員分別是(A1、A2、A3),(B1、B2)、(C1)、(D1),有一個measure(對于這列V,計算sum(V)),這里忽略dictionary編碼。原始表如下:?
那么base cuboid最終的輸出如下?
(A1、B1、C1、D1、2)?
(A1、B2、C1、D1, 3)?
(A2、B1、C1、D1, 5)?
(A3、B1、C1、D1, 6)?
(A3、B2、C1、D1, 8)?
那么它作為下面一個cuboid的輸入,對于第一行輸入?
(A1、B1、C1、D1,2),mapper執行完成之后會輸出?
(A1、B1、C1, 2)、?
(A1、B1、D1, 2)、?
(A1、C1、D1, 2)、?
(B1、C1、D1,2)這四項,同樣對于其他的內一行也會輸出四行,最終他們經過reducer的聚合運算,得到如下的結果:?
(A1、B1、C1, 2)?
(A1、B1、D1, 2)?
(A1、C1、D1, 2 + 3)?
(B1、C1、D1,2 + 5 +6)
這個例子其實在cube的構建過程中可以看到。
一定要注意,這里的每一輪計算都是MapReducer任務,且串行執行;一個N維的Cube,至少需要N次MapReduce Job。
算法的優點
此算法充分利用了MR的能力,處理了中間復雜的排序和洗牌工作,故而算法代碼清晰簡單,易于維護。
受益于Hadoop的日趨成熟,此算法對集群要求低,運行穩定。
算法的缺點
當Cube有比較多維度的時候,所需要的MR任務也相應增加;由于Hadoop的任務調度需要耗費額外資源,特別是集群較龐大的時候,反復遞交任務造成的額外開銷會很可觀
由于Mapper不做預聚合,此算法會對Hadoop MR輸出較多數據;雖然已經使用了Combiner來減少從Mapper端到Reducer端的數據傳輸,所有數據依然需要通過MR來排序和組合才能被聚合,無形之中增加了集群的壓力。
對HDFS的讀寫操作較多:由于每一層計算的輸出會用作下一層計算的輸入,這些Key-value需要寫到HDFS上;當所有計算都完成后,Kylin還需要額外一輪任務將這些文件轉成Hbase的HFile格式,以導入到HBase中去。
總體而言,該算法的效率較低,尤其當Cube維度數較大的時候。
這里其實在困惑到底什么是0維,后來想明白了。舉個例子,現在有一個度量叫成交量。有幾個維度從大到小:業務類型、渠道、門店。3維的例子就是[業務類型、渠道、門店],二維的例子是[業務類型、渠道],一維[業務類型],0維其實就是沒有維度,也就是全部聚合,舉個例子就是
select sum(price) from table1
1
其實在我看來,逐層算法就是先算維度數最高的,一層算完后,再算維度數減少的一層,以此類推。至于為什么從層級高向層級低計算,而不是反過來,在于如果是反過來,那你每次的計算量都是初始數據,數據量非常大,沒必要。
快速Cube算法(Fast Cubing)
快速Cube算法,它還被稱作“逐段”(By Segment)或“逐塊”(By Split)算法。
該算法的主要思想,對Mapper所分配的數據塊,將它計算成一個完整的小Cube段(包含所有Cuboid);每個Mapper將計算完的Cube段輸出給Reducer做合并,生成大Cube,也就是最終結果。
與舊算法相比,快速算法主要有兩點不同:
Mapper會利用內存做預聚合,算出所有組合;Mapper輸出的每個Key都是不同的,這樣會減少輸出到Hadoop MapReduce的數據量,Combiner也不再需要;
一輪MapReduce便會完成所有層次的計算,減少Hadoop任務的調配。
來說個比較。逐層算法的每一層的計算都有一個MapReduce任務,因為是從高維到低維的MR任務,任務之間傳遞的數據量是非常大的。比如上面的例子,生成4維的數據,需要在mapper中對全數據進行的整理,再傳遞給reducer聚合,如果數據量非常大,那么網絡IO是很大的。而快速算法,它會對某個分片數據進行構造完整的cube(所有cuboid)。再將mapper中的數據送入reducer進行大聚合生成Cube。這其實是在map階段就已經完成了聚合,IO是很小的。
舉個例子
這里不理解沒關系,看完后面的構建過程再翻回來看例子就能懂
一個Cube有4個維度:A,B,C,D;每個Mapper都有100萬個源記錄要處理;Mapper中的列基數是Car(A),Car(B),Car(C)和Car(D)。(cardinal 基數)
當講源記錄聚集到base cuboid(1111)時,使用舊的“逐層”算法,每個Mapper將向Hadoop輸出1百萬條記錄;使用快速立方算法,在預聚合之后,它預聚合之后,它只向Hadoop輸出[distinct A,B,C,D]記錄的數量,這樣肯定比源數據小;在正常情況下,他可以源記錄大小的1/10到1/100.
當從父cuboid聚合到子cuboid時,從base cuboid(1111) 到3維cuboid 0111,將會聚合維度A;我們假設維度A與其他維度獨立的,聚合后,cuboid 0111的維度base cuboid的1/Card(A);所以在這一步的輸出將減少到原來的1/Card(A);
總的來說,假設維度的平均基數是Card(N),從Mapper到Reducer的寫入記錄可以減少到原始維度的1/Card(N);Hadoop的輸出越少,I/O和計算越少,性能就越好。
這里要提一句,其實很多都是類似的,比如在hive中處理大表,?
各種的調優都和IO、計算有關系,因為他們都是基于MR任務。
子立方體生成樹(Cuboid spanning Tree)的遍歷次序
在舊算法中,Kylin按照層級,也就是廣度優先遍歷(Broad First Search)的次序計算出各個Cuboid;在快速Cube算法中,Mapper會按照深度優先遍歷(Depth First Search)來計算各個Cuboid。?
深度優先遍歷是一個遞歸方法,將父cuboid壓棧以計算子Cuboid,直到沒有子Cuboid需要計算才出棧并輸出給Hadoop;需要最多暫存N個Cuboid,N是Cube維度數。
采用DFS,是為了兼顧CPU和內存。
從父Cuboid計算子Cuboid,避免重復計算。
只壓棧當前計算的Cuboid的父Cuboid,減少內存占用。?
舉個例子從3維到2維的MR任務中計算CD,BFS會壓入ABC ABD ACD BCD,mapper進行切分,reducer進行聚合;而在DFS中,只會壓入ABCD,BCD,內存大大減少。
上圖是一個四維Cube的完整生成樹:
按照DFS的次序,在0維Cuboid輸出前的計算次序是ABCD-》BCD-》CD-》D-》0維,ABCD,BCD,CD和D需要被暫存;在被輸出后,D可被輸出,內存得到釋放;在C被計算并輸出后,CD就可以被輸出,ABCD最后被輸出。
使用DFS訪問順序,Mapper的輸出已完全排序,因為Cuboid ID位于行鍵的開始位置,而內部的Cuboid的行已排序。
0000?
0001[D0]?
0001[D1]?
....?
0010[C0]?
0010[C1]?
....?
0011[C0][D0]?
0011[C0][D1]?
....?
....?
1111[A0][B0][C0][D0]?
....?
這里的寫法可以看構造過程。?
由于mapper的輸出已經排序,Hadoop的排序效率會更高。
此外,mapper的預聚合發生在內存中,這樣可以避免不必要的磁盤和網絡IO,并減少了hadoop的開銷。
在開發階段,我們在mapper中遇到了OOM錯誤;這可能發生在:?
- Mapper的JVM堆大小很小?
- 使用 distinct count度量?
- 使用樹太深(維度太多)?
- 給Mapper的數據太大
我們意識到Kylin不能認為mapper總是有足夠的內存;Cubing算法需要自適應各種情況;
當主動檢測到OOM錯誤,會優化內存使用并將數據spilling到磁盤上;結果是有希望的,OOM錯誤現在很少發生。
優點
它比舊的方法更快;從我們的比較測試中可以減少30%到50%的build總時間:快在排序,快在IO。
他在Hadoop上產生較少的工作負載,并在HDFS上留下較少的中間文件。
Cubing和Spark等其他立方體引起可以輕松地重復使用該立方體代碼。
缺點
該算法有點復雜,這增加了維護工作;
雖然該算法可以自動將數據spill到磁盤,但他仍希望Mapper有足夠的內存來獲得最佳性能。
用戶需要更多知識來調整立方體。
By-layer Spark Cubing算法
我們知道,RDD(Resilient Distributed DataSet)是Spark中的一個基本概念。N維立方體的組合可以很好地描述為RDD,N維立方體將具有N+1個RDD。這些RDD具有parent/child關系,因為這些parent RDD 可用于生成child RDD。通過將父RDD緩存在內存中,子RDD的生成可以比磁盤讀取更有效。
改進
每一層的cuboid視為一個RDD
父RDD被盡可能cache到內存
RDD 被導出為sequence file
通過將“map”替換為“flatMap”,以及把“reduce”替換為“reduceByKey”,可以復用大部分代碼
Spark中Cubing的過程
下圖DAG(有向無環圖),它詳細說明了這個過程:
在Stage 5中,Kylin使用HiveContext讀取中間Hive表,然后執行一個一對一映射的”map”操作將原始值編碼為KV字節。完成后Kylin得到一個中間編碼的RDD。
在Stage 6中,中間RDD用一個“ReduceByKey”操作聚合以獲得RDD-1,這是base cuboid。接下來,在RDD-1做了一個flatMap(一對多map),因為base cuboid有N個cuboid。以此類推,各級RDD得到計算。在完成時,這些RDD將完整地保存在分布式文件系統,但可以緩存在內存中用于下一級計算。當生成子cuboid時,他將從緩存中刪除。
其實我們和舊的逐層算法去比較會發現,他們之間的構建沒有什么大的差別,只不過Spark的是在內存中進行的,無需從磁盤讀取和網絡IO。并且后面的stage的第一步是reduce。
性能測試
在所有這三種情況下,Spark都比MR快,總體而言它可以減少約一半的時間。
Kylin的構建算法以及和spark的改進?
http://cxy7.com/articles/2018/06/09/1528549073259.html?
https://www.cnblogs.com/zlslch/p/7404465.html
---------------------?
e Kylin是一個開源的分布式分析引擎,提供Hadoop之上的SQL查詢接口及多維分析(OLAP)能力以支持超大規模數據。它能在亞秒內查詢巨大的Hive表。本文將詳細介紹Apache Kylin 1.5中的Fast-Cubing算法。
Fast Cubing,也稱快速數據立方算法, 是一個新的Cube算法。我們知道,Cube的思想是用空間換時間, 通過預先的計算,把索引及結果存儲起來,以換取查詢時候的高性能?。在Kylin v1.5以前,Kylin中的Cube只有一種算法:layered cubing,也稱逐層算法:它是逐層由底向上,把所有組合算完的過程。
Cube構建算法介紹
1 逐層算法(Layer Cubing)
我們知道,一個N維的Cube,是由1個N維子立方體、N個(N-1)維子立方體、N*(N-1)/2個(N-2)維子立方體、......、N個1維子立方體和1個0維子立方體構成,總共有2^N個子立方體組成,在逐層算法中,按維度數逐層減少來計算,每個層級的計算(除了第一層,它是從原始數據聚合而來),是基于它上一層級的結果來計算的。
比如,[Group by A, B]的結果,可以基于[Group by A, B, C]的結果,通過去掉C后聚合得來的;這樣可以減少重復計算;當 0維度Cuboid計算出來的時候,整個Cube的計算也就完成了。
逐層算法
?
如上圖所示,展示了一個4維的Cube構建過程。
此算法的Mapper和Reducer都比較簡單。Mapper以上一層Cuboid的結果(Key-Value對)作為輸入。由于Key是由各維度值拼接在一起,從其中找出要聚合的維度,去掉它的值成新的Key,并對Value進行操作,然后把新Key和Value輸出,進而Hadoop MapReduce對所有新Key進行排序、洗牌(shuffle)、再送到Reducer處;Reducer的輸入會是一組有相同Key的Value集合,對這些Value做聚合計算,再結合Key輸出就完成了一輪計算。
每一輪的計算都是一個MapReduce任務,且串行執行; 一個N維的Cube,至少需要N次MapReduce Job。
Layer Cubing算法優點
此算法充分利用了MapReduce的能力,處理了中間復雜的排序和洗牌工作,故而算法代碼清晰簡單,易于維護;
受益于Hadoop的日趨成熟,此算法對集群要求低,運行穩定;在內部維護Kylin的過程中,很少遇到在這幾步出錯的情況;即便是在Hadoop集群比較繁忙的時候,任務也能完成。
Layer Cubing算法缺點
當Cube有比較多維度的時候,所需要的MapReduce任務也相應增加;由于Hadoop的任務調度需要耗費額外資源,特別是集群較龐大的時候,反復遞交任務造成的額外開銷會相當可觀;
由于Mapper不做預聚合,此算法會對Hadoop MapReduce輸出較多數據; 雖然已經使用了Combiner來減少從Mapper端到Reducer端的數據傳輸,所有數據依然需要通過Hadoop MapReduce來排序和組合才能被聚合,無形之中增加了集群的壓力;
對HDFS的讀寫操作較多:由于每一層計算的輸出會用做下一層計算的輸入,這些Key-Value需要寫到HDFS上;當所有計算都完成后,Kylin還需要額外的一輪任務將這些文件轉成HBase的HFile格式,以導入到HBase中去;
總體而言,該算法的效率較低,尤其是當Cube維度數較大的時候;時常有用戶問,是否能改進Cube算法,縮短時間。
2 快速Cube算法(Fast Cubing)
快速Cube算法(Fast Cubing)是麒麟團隊對新算法的一個統稱,它還被稱作“逐段”(By Segment) 或“逐塊”(By Split) 算法。
該算法的主要思想是,對Mapper所分配的數據塊,將它計算成一個完整的小Cube 段(包含所有Cuboid);每個Mapper將計算完的Cube段輸出給Reducer做合并,生成大Cube,也就是最終結果;圖2解釋了此流程。新算法的核心思想是清晰簡單的,就是最大化利用Mapper端的CPU和內存,對分配的數據塊,將需要的組合全都做計算后再輸出給Reducer;由Reducer再做一次合并(merge),從而計算出完整數據的所有組合。如此,經過一輪Map-Reduce就完成了以前需要N輪的Cube計算。圖2是此算法的概覽。
在Mapper內部, 也可以有一些優化,圖3是一個典型的四維Cube的生成樹;第一步會計算Base Cuboid(所有維度都有的組合),再基于它計算減少一個維度的組合。基于parent節點計算child節點,可以重用之前的計算結果;當計算child節點時,需要parent節點的值盡可能留在內存中; 如果child節點還有child,那么遞歸向下,所以它是一個深度優先遍歷。當有一個節點沒有child,或者它的所有child都已經計算完,這時候它就可以被輸出,占用的內存就可以釋放。
如果內存夠的話,可以多線程并行向下聚合。如此可以最大限度地把計算發生在Mapper這一端,一方面減少shuffle的數據量,另一方面減少Reducer端的計算量。
Fast Cubing的優點:
總的IO量比以前大大減少。?
此算法可以脫離Map-Reduce而對數據做Cube計算,故可以很容易地在其它場景或框架下執行,例如Streaming 和Spark。
Fast Cubing的缺點:
代碼比以前復雜了很多: 由于要做多層的聚合,并且引入多線程機制,同時還要估算JVM可用內存,當內存不足時需要將數據暫存到磁盤,所有這些都增加復雜度。
對Hadoop資源要求較高,用戶應盡可能在Mapper上多分配內存;如果內存很小,該算法需要頻繁借助磁盤,性能優勢就會較弱。在極端情況下(如數據量很大同時維度很多),任務可能會由于超時等原因失敗;
要讓Fast-Cubing算法獲得更高的效率,用戶需要了解更多一些“內情”。
首先,在v1.5里,Kylin在對Fast-Cubing請求資源時候,默認是為Mapper任務請求3Gb的內存,給JVM2.7Gb。如果Hadoop節點可用內存較多的話,用戶可以讓Kylin獲得更多內存:在conf/kylin_job_conf_inmem.xml文件,由參數“mapreduce.map.memory.mb”和“mapreduce.map.java.opts”設定 。
其次,需要在并發性和Mapper端聚合之間找到一個平衡。在v1.5.2里,Kylin默認是給每個Mapper分配32兆的數據;這樣可以獲得較高的并發性。但如果Hadoop集群規模較小,或可用資源較少,過多的Mapper會造成任務排隊。這時,將數據塊切得更大,如 64兆,效果會更好。數據塊是由Kylin創建Hive平表時生成的, 在kylin_hive_conf.xml由參數dfs.block.size決定的。從v1.5.3開始,分配策略又有改進,給每個mapper會分配一樣的行數,從而避免數據塊不均勻時的木桶效應:由conf/kylin.properteis里的“kylin.job.mapreduce.mapper.input.rows”配置,默認是100萬,用戶可以示自己集群的規模設置更小值獲得更高并發,或更大值減少請求的Mapper數。
通常推薦Fast-Cubing 算法,但不是所有情況下都如此。舉例說明,如果每個Mapper之間的key交叉重合度較低,fast cubing更適合;因為Mapper端將這塊數據最終要計算的結果都達到了,Reducer只需少量的聚合。另一個極端是,每個Mapper計算出的key跟其它 Mapper算出的key深度重合,這意味著在reducer端仍需將各個Mapper的數據抓取來再次聚合計算;如果key的數量巨大,該過程IO開銷依然顯著。對于這種情況,Layered-Cubing更適合。
用戶該如何選擇算法呢?無需擔心,Kylin會自動選擇合適的算法。Kylin在計算Cube之前對數據進行采樣,在“fact distinct”步,利用HyperLogLog模擬去重,估算每種組合有多少不同的key,從而計算出每個Mapper輸出的數據大小,以及所有Mapper之間數據的重合度,據此來決定采用哪種算法更優。在對上百個Cube任務的時間做統計分析后,Kylin選擇了7做為默認的算法選擇閥值(參數kylin.cube.algorithm.layer-or-inmem-threshold):如果各個Mapper的小Cube的行數之和,大于reduce后的Cube行數的7倍,采用Layered Cubing, 反之采用Fast Cubing。如果用戶在使用過程中,更傾向于使用Fast Cubing,可以適當調大此參數值,反之調小。
????????????????int mapperNumLimit = kylinConf.getCubeAlgorithmAutoMapperLimit();
????????????????double overlapThreshold = kylinConf.getCubeAlgorithmAutoThreshold(); // default 7
????????????????logger.info("mapperNumber for " + seg + " is " + mapperNumber + " and threshold is " + mapperNumLimit);
????????????????logger.info("mapperOverlapRatio for " + seg + " is " + mapperOverlapRatio + " and threshold is " + overlapThreshold);
????????????????// in-mem cubing is good when
????????????????// 1) the cluster has enough mapper slots to run in parallel
????????????????// 2) the mapper overlap ratio is small, meaning the shuffle of in-mem MR has advantage
????????????????alg = (mapperNumber <= mapperNumLimit && mapperOverlapRatio <= overlapThreshold)//
????????????????????????? CubingJob.AlgorithmEnum.INMEM
????????????????????????: CubingJob.AlgorithmEnum.LAYER;
Kylin Cube 構建算法結論(逐層算法和快速算法):
1、如果每個Mapper之間的key交叉重合度較低,fast cubing更適合;因為Mapper端將這塊數據最終要計算的結果都達到了,Reducer只需少量的聚合。另一個極端是,每個Mapper計算出的key跟其它 Mapper算出的key深度重合,這意味著在reducer端仍需將各個Mapper的數據抓取來再次聚合計算;如果key的數量巨大,該過程IO開銷依然顯著。對于這種情況,Layered-Cubing更適合。
2、在對上百個Cube任務的時間做統計分析后,Kylin選擇了7做為默認的算法選擇閥值(參數kylin.cube.algorithm.auto.threshold):如果各個Mapper的小Cube的行數之和,大于reduce后的Cube行數的8倍(各個Mapper的小Cube的行數之和 /?reduce后的Cube行數 > 7),采用Layered Cubing, 反之采用Fast Cubing(本質就是各個Mapper之間的key重復度越小,就用Fast Cubing,重復度越大,就用Layered Cubing)
---------------------?
轉載于:https://my.oschina.net/hblt147/blog/3006400
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的kylin KV+cube方案分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: gRPC入门
- 下一篇: unity2D限制位置的背景移动补偿效果