浅析Hadoop文件格式
http://www.infoq.com/cn/articles/hadoop-file-format
Hadoop 作為MR 的開源實現,一直以動態運行解析文件格式并獲得比MPP數據庫快上幾倍的裝載速度為優勢。不過,MPP數據庫社區也一直批評Hadoop由于文件格式并非為特定目的而建,因此序列化和反序列化的成本過高[7]。本文介紹Hadoop目前已有的幾種文件格式,分析其特點、開銷及使用場景。希望加深讀者對Hadoop文件格式及其影響性能的因素的理解。
Hadoop 中的文件格式
1 SequenceFile
SequenceFile是Hadoop API 提供的一種二進制文件,它將數據以<key,value>的形式序列化到文件中。這種二進制文件內部使用Hadoop 的標準的Writable 接口實現序列化和反序列化。它與Hadoop API中的MapFile 是互相兼容的。Hive 中的SequenceFile 繼承自Hadoop API 的SequenceFile,不過它的key為空,使用value 存放實際的值, 這樣是為了避免MR 在運行map 階段的排序過程。如果你用Java API 編寫SequenceFile,并讓Hive 讀取的話,請確保使用value字段存放數據,否則你需要自定義讀取這種SequenceFile 的InputFormat class 和OutputFormat class。
圖1:Sequencefile 文件結構
2 RCFile
RCFile是Hive推出的一種專門面向列的數據格式。 它遵循“先按列劃分,再垂直劃分”的設計理念。當查詢過程中,針對它并不關心的列時,它會在IO上跳過這些列。需要說明的是,RCFile在map階段從遠端拷貝仍然是拷貝整個數據塊,并且拷貝到本地目錄后RCFile并不是真正直接跳過不需要的列,并跳到需要讀取的列, 而是通過掃描每一個row group的頭部定義來實現的,但是在整個HDFS Block 級別的頭部并沒有定義每個列從哪個row group起始到哪個row group結束。所以在讀取所有列的情況下,RCFile的性能反而沒有SequenceFile高。圖2:RCFile 文件結構
3 Avro
Avro是一種用于支持數據密集型的二進制文件格式。它的文件格式更為緊湊,若要讀取大量數據時,Avro能夠提供更好的序列化和反序列化性能。并且Avro數據文件天生是帶Schema定義的,所以它不需要開發者在API 級別實現自己的Writable對象。最近多個Hadoop 子項目都支持Avro 數據格式,如Pig 、Hive、Flume、Sqoop和Hcatalog。
圖3:Avro MR 文件格式
4. 文本格式
除上面提到的3種二進制格式之外,文本格式的數據也是Hadoop中經常碰到的。如TextFile 、XML和JSON。 文本格式除了會占用更多磁盤資源外,對它的解析開銷一般會比二進制格式高幾十倍以上,尤其是XML 和JSON,它們的解析開銷比Textfile 還要大,因此強烈不建議在生產系統中使用這些格式進行儲存。 如果需要輸出這些格式,請在客戶端做相應的轉換操作。 文本格式經常會用于日志收集,數據庫導入,Hive默認配置也是使用文本格式,而且常常容易忘了壓縮,所以請確保使用了正確的格式。另外文本格式的一個缺點是它不具備類型和模式,比如銷售金額、利潤這類數值數據或者日期時間類型的數據,如果使用文本格式保存,由于它們本身的字符串類型的長短不一,或者含有負數,導致MR沒有辦法排序,所以往往需要將它們預處理成含有模式的二進制格式,這又導致了不必要的預處理步驟的開銷和儲存資源的浪費。
5. 外部格式
Hadoop實際上支持任意文件格式,只要能夠實現對應的RecordWriter和RecordReader即可。其中數據庫格式也是會經常儲存在Hadoop中,比如Hbase,Mysql,Cassandra,MongoDB。 這些格式一般是為了避免大量的數據移動和快速裝載的需求而用的。他們的序列化和反序列化都是由這些數據庫格式的客戶端完成,并且文件的儲存位置和數據布局(Data Layout)不由Hadoop控制,他們的文件切分也不是按HDFS的塊大小(blocksize)進行切割。
文件存儲大小比較與分析
我們選取一個TPC-H標準測試來說明不同的文件格式在存儲上的開銷。因為此數據是公開的,所以讀者如果對此結果感興趣,也可以對照后面的實驗自行做一遍。Orders 表文本格式的原始大小為1.62G。 我們將其裝載進Hadoop 并使用Hive 將其轉化成以上幾種格式,在同一種LZO 壓縮模式下測試形成的文件的大小。
| Orders_text1 | 1732690045 | 1.61G | 非壓縮 | TextFile |
| Orders_tex2 | 772681211 | 736M | LZO壓縮 | TextFile |
| Orders_seq1 | 1935513587 | 1.80G | 非壓縮 | SequenceFile |
| Orders_seq2 | 822048201 | 783M | LZO壓縮 | SequenceFile |
| Orders_rcfile1 | 1648746355 | 1.53G | 非壓縮 | RCFile |
| Orders_rcfile2 | 686927221 | 655M | LZO壓縮 | RCFile |
| Orders_avro_table1 | 1568359334 | 1.46G | 非壓縮 | Avro |
| Orders_avro_table2 | 652962989 | 622M | LZO壓縮 | Avro |
表1:不同格式文件大小對比
從上述實驗結果可以看到,SequenceFile無論在壓縮和非壓縮的情況下都比原始純文本TextFile大,其中非壓縮模式下大11%, 壓縮模式下大6.4%。這跟SequenceFile的文件格式的定義有關: SequenceFile在文件頭中定義了其元數據,元數據的大小會根據壓縮模式的不同略有不同。一般情況下,壓縮都是選取block 級別進行的,每一個block都包含key的長度和value的長度,另外每4K字節會有一個sync-marker的標記。對于TextFile文件格式來說不同列之間只需要用一個行間隔符來切分,所以TextFile文件格式比SequenceFile文件格式要小。但是TextFile 文件格式不定義列的長度,所以它必須逐個字符判斷每個字符是不是分隔符和行結束符。因此TextFile 的反序列化開銷會比其他二進制的文件格式高幾十倍以上。
RCFile文件格式同樣也會保存每個列的每個字段的長度。但是它是連續儲存在頭部元數據塊中,它儲存實際數據值也是連續的。另外RCFile 會每隔一定塊大小重寫一次頭部的元數據塊(稱為row group,由hive.io.rcfile.record.buffer.size控制,其默認大小為4M),這種做法對于新出現的列是必須的,但是如果是重復的列則不需要。RCFile 本來應該會比SequenceFile 文件大,但是RCFile 在定義頭部時對于字段長度使用了Run Length Encoding進行壓縮,所以RCFile 比SequenceFile又小一些。Run length Encoding針對固定長度的數據格式有非常高的壓縮效率,比如Integer、Double和Long等占固定長度的數據類型。在此提一個特例——Hive 0.8引入的TimeStamp 時間類型,如果其格式不包括毫秒,可表示為”YYYY-MM-DD HH:MM:SS”,那么就是固定長度占8個字節。如果帶毫秒,則表示為”YYYY-MM-DD HH:MM:SS.fffffffff”,后面毫秒的部分則是可變的。
Avro文件格式也按group進行劃分。但是它會在頭部定義整個數據的模式(Schema), 而不像RCFile那樣每隔一個row group就定義列的類型,并且重復多次。另外,Avro在使用部分類型的時候會使用更小的數據類型,比如Short或者Byte類型,所以Avro的數據塊比RCFile 的文件格式塊更小。
序列化與反序列化開銷分析
我們可以使用Java的profile工具來查看Hadoop 運行時任務的CPU和內存開銷。以下是在Hive 命令行中的設置:
hive>set mapred.task.profile=true;hive>set mapred.task.profile.params =-agentlib:hprof=cpu=samples,heap=sites, depth=6,force=n,thread=y,verbose=n,file=%s當map task 運行結束后,它產生的日志會寫在$logs/userlogs/job- 文件夾下。當然,你也可以直接在JobTracker的Web界面的logs或jobtracker.jsp 頁面找到日志。
我們運行一個簡單的SQL語句來觀察RCFile 格式在序列化和反序列化上的開銷:
hive> select O_CUSTKEY,O_ORDERSTATUS from orders_rc2 where O_ORDERSTATUS='P';其中的O_CUSTKEY列為integer類型,O_ORDERSTATUS為String類型。在日志輸出的最后會包含內存和CPU 的消耗。
下表是一次CPU 的開銷:
| rank | self | accum | count | trace | method |
| 20?????????? | 0.48% | 79.64% | 65 | 315554 | org.apache.hadoop.hive.ql.io.RCFile$Reader.getCurrentRow |
| 28 | 0.24% | 82.07% | 32 | 315292 | org.apache.hadoop.hive.serde2.columnar.ColumnarStruct.init |
| 55 | 0.10% | 85.98% | 14 | 315788 | org.apache.hadoop.hive.ql.io.RCFileRecordReader.getPos |
| 56 | 0.10% | 86.08% | 14 | 315797 | org.apache.hadoop.hive.ql.io.RCFileRecordReader.next |
表2:一次CPU的開銷
其中第五列可以對照上面的Track信息查看到底調用了哪些函數。比如CPU消耗排名20的函數對應Track:
TRACE 315554: (thread=200001) org.apache.hadoop.hive.ql.io.RCFile$Reader.getCurrentRow(RCFile.java:1434) org.apache.hadoop.hive.ql.io.RCFileRecordReader.next(RCFileRecordReader.java:88) org.apache.hadoop.hive.ql.io.RCFileRecordReader.next(RCFileRecordReader.java:39)org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:98)org.apache.hadoop.hive.ql.io.CombineHiveRecordReader.doNext(CombineHiveRecordReader.java:42) org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader.next(HiveContextAwareRecordReader.java:67)其中,比較明顯的是RCFile,它為了構造行而消耗了不必要的數組移動開銷。其主要是因為RCFile 為了還原行,需要構造RowContainer,順序讀取一行構造RowContainer,然后給其中對應的列進行賦值,因為RCFile早期為了兼容SequenceFile所以可以合并兩個block,又由于RCFile不知道列在哪個row group結束,所以必須維持數組的當前位置,類似如下格式定義:
Array<RowContainer extends List<Object>>而此數據格式可以改為面向列的序列化和反序列化方式。如:
Map<array<col1Type>,array<col2Type>,array<col3Type>....>這種方式的反序列化會避免不必要的數組移動,當然前提是我們必須知道列在哪個row group開始到哪個row group結束。這種方式會提高整體反序列化過程的效率。
關于Hadoop文件格式的思考
1 高效壓縮
Hadoop目前尚未出現針對數據特性的高效編碼(Encoding)和解碼(Decoding)數據格式。尤其是支持Run Length Encoding、Bitmap 這些極為高效算法的數據格式。HIVE-2065 討論過使用更加高效的壓縮形式,但是對于如何選取列的順序沒有結論。關于列順序選擇可以看Daniel Lemire的一篇論文 《Reordering Columns for Smaller Indexes》[1]。作者同時也是Hive 0.8中引入的bitmap 壓縮算法基礎庫的作者。該論文的結論是:當某個表需要選取多個列進行壓縮時,需要根據列的選擇性(selectivity)進行升序排列,即唯一值越少的列排得越靠前。 事實上這個結論也是Vertica多年來使用的數據格式。其他跟壓縮有關的還有HIVE-2604和HIVE-2600。
2 基于列和塊的序列化和反序列化
不論排序后的結果是不是真的需要,目前Hadoop的整體框架都需要不斷根據數據key進行排序。除了上面提到的基于列的排序,序列化和反序列化之外,Hadoop的文件格式應該支持某種基于塊(Block) 級別的排序和序列化及反序列化方式,只有當數據滿足需要時才進行這些操作。來自Google Tenzing論文中曾將它作為MR 的優化手段提到過。
“Block Shuffle:正常來說,MR 在Shuffle 的時候使用基于行的編碼和解碼。為了逐個處理每一行,數據必須先排序。然而,當排序不是必要的時候這種方式并不高效,我們在基于行的shuffle基礎上實現了一種基于block的shuffle方式,每一次處理大概1M的壓縮block,通過把整個block當成一行,我們能夠避免MR框架上的基于行的序列化和反序列化消耗,這種方式比基于行的shuffle 快上3倍以上。”
3 數據過濾(Skip List)
除常見的分區和索引之外,使用排序之后的塊(Block)間隔也是常見列數據庫中使用的過濾數據的方法。Google Tenzing同樣描述了一種叫做ColumnIO 的數據格式,ColumnIO在頭部定義該Block的最大值和最小值,在進行數據判斷的時候,如果當前Block的頭部信息里面描述的范圍中不包含當前需要處理的內容,則會直接跳過該塊。Hive社區里曾討論過如何跳過不需要的塊 ,可是因為沒有排序所以一直沒有較好的實現方式。包括RCFile格式,Hive的index 機制里面目前還沒有一個高效的根據頭部元數據就可以跳過塊的實現方式。
4 延遲物化
真正好的列數據庫,都應該可以支持直接在壓縮數據之上不需要通過解壓和排序就能夠直接操作塊。通過這種方式可以極大的降低MR 框架或者行式數據庫中先解壓,再反序列化,然后再排序所帶來的開銷。Google Tenzing里面描述的Block Shuffle 也屬于延遲物化的一種。更好的延遲物化可以直接在壓縮數據上進行操作,并且可以做內部循環, 此方面在論文《Integrating Compression and Execution in Column-Oriented Database System》[5]的5.2 章節有描述。 不過考慮到它跟UDF 集成也有關系,所以,它會不會將文件接口變得過于復雜也是一件有爭議的事情。
5 與Hadoop框架集成
無論文本亦或是二進制格式,都只是最終的儲存格式。Hadoop運行時產生的中間數據卻沒有辦法控制。包括一個MR Job在map和reduce之間產生的數據或者DAG Job上游reduce 和下游map之間的數據,尤其是中間格式并不是列格式,這會產生不必要的IO和CPU 開銷。比如map 階段產生的spill,reduce 階段需要先copy 再sort-merge。如果這種中間格式也是面向列的,然后將一個大塊切成若干小塊,并在頭部加上每個小塊的最大最小值索引,就可以避免大量sort-mege操作中解壓—反序列化—排序—合并(Merge)的開銷,從而縮短任務的運行時間。
其他文件格式
Hadoop社區也曾有對其他文件格式的研究。比如,IBM 研究過面向列的數據格式并發表論文《Column-Oriented Storage Techniques for MapReduce》[4],其中特別提到IBM 的CIF(Column InputFormat)文件格式在序列化和反序列化的IO消耗上比RCFile 的消耗要小20倍。里面提到的將列分散在不同的HDFS Block 塊上的實現方式RCFile 也有考慮過,但是最后因為重組行的消耗可能會因分散在遠程機器上產生的延遲而最終放棄了這種實現。此外,最近Avro也在實現一種面向列的數據格式,不過目前Hive 與Avro 集成尚未全部完成。有興趣的讀者可以關注avro-806 和hive-895。
總結
Hadoop 可以與各種系統兼容的前提是Hadoop MR 框架本身能夠支持多種數據格式的讀寫。但如果要提升其性能,Hadoop 需要一種高效的面向列的基于整個MR 框架集成的數據格式。尤其是高效壓縮,塊重組(block shuffle),數據過濾(skip list)等高級功能,它們是列數據庫相比MR 框架在文件格式上有優勢的地方。相信隨著社區的發展以及Hadoop 的逐步成熟,未來會有更高效且統一的數據格式出現。
?
============
public class SequenceFileReadDemo {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, path, conf);
Writable key = (Writable)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}
總結
以上是生活随笔為你收集整理的浅析Hadoop文件格式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HBase安装过程
- 下一篇: Message-Digest Algor