利用Hadoop Streaming处理二进制格式文件
注:本文用到的程序實例可在百度云:hadoop-streaming-binary-examples?下載。
在詳細介紹操作步驟之前,先介紹本文給出的實例。假設有這樣的SequenceFile,它保存了手機通訊錄信息,其中,key是好友名,value是描述該好友的一個結構體或者對象,為此,本文使用了google開源的protocol buffer這一序列化/反序列化框架,protocol buffer結構體定義如下:
| 1 2 3 4 5 6 7 8 9 | option java_package = ""; option java_outer_classname="PersonInfo"; message Person { ??optional string name = 1; ??optional int32 age = 2; ??optional int64 phone = 3; ??optional string address = 4; } |
SequenceFile文件中的value便是保存的Person對象序列化后的字符串,這是典型的二進制數據,不能像文本數據那樣可通過換行符解析出每條記錄,因為二進制數據的每條記錄中可能包含任意字符,包括換行符。
一旦有了這樣的SequenceFile之后,我們將使用Hadoop Streaming編寫這樣的MapReduce程序:這個MapReduce程序只有Map Task,任務是解析出文件中的每條好友記錄,并以name \t age,phone,address的文本格式保存到HDFS上。
1. 準備數據
首先,我們需要準備上面介紹的SequenceFile數據,生成數據的核心代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 | final SequenceFile.Writer out = ????????SequenceFile.createWriter(fs, getConf(), new Path(args[0]), ????????????????Text.class, BytesWritable.class); Text nameWrapper = new Text(); BytesWritable personWrapper = new BytesWritable(); System.out.println("Generating " + num + " Records......"); for(int i = 0; i < num; i++) { ??genOnePerson(nameWrapper, personWrapper); ??System.out.println("Generating " + i + " Records," + nameWrapper.toString() + "......"); ??out.append(nameWrapper, personWrapper); } out.close(); |
當然,為了驗證我們產生的數據是否正確,需要編寫一個解析程序,核心代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 | Reader reader = new Reader(fs, new Path(args[0]), getConf()); Text key = new Text(); BytesWritable value = new BytesWritable(); while(reader.next(key, value)) { ??System.out.println("key:" + key.toString()); ??value.setCapacity(value.getSize()); // Very important!!! Very Tricky!!! ??PersonInfo.Person person = PersonInfo.Person.parseFrom(value.getBytes()); ??System.out.println("age:" + person.getAge() ??????????+ ",address:" + person.getAddress() ??????????+",phone:" + person.getPhone()); } reader.close(); |
需要注意的,Value保存類型為BytesWritable,使用這個類型非常容易犯錯誤。當你把一堆byte[]數據保存到BytesWritable后,通過BytesWritable.getBytes()再讀到的數據并不一定是原數據,可能變長了很多,這是因為BytesWritable采用了自動內存增長算法,你保存的數據長度為size時,它可能將數據保存到了長度為capacity(capacity>size)的buffer中,這時候,你通過BytesWritable.getBytes()得到的數據最后一些字符是多余的,如果里面保存的是protocol buffer序列化后的字符串,則無法反序列化,這時候可以使用BytesWritable.setCapacity (value.getSize())將后面多余空間剔除掉。
2. 使用Hadoop Streaming編寫C++程序
為了說明Hadoop Streaming如何處理二進制格式數據,本文僅僅以C++語言為例進行說明,其他語言的設計方法類似。
先簡單說一下原理。當輸入數據是二進制格式時,Hadoop Streaming會對輸入key和value進行編碼后,通過標準輸入傳遞給你的Hadoop Streaming程序,目前提供了兩種編碼格式,分別是rawtypes和???????? typedbytes,你可以設計你想采用的格式,這兩種編碼規則如下(具體在文章“Hadoop Streaming高級編程”中已經介紹了):
rawbytes:key和value均用【4個字節的長度+原始字節】表示
typedbytes:key和value均用【1字節類型+4字節長度+原始字節】表示
本文將采用第一種編碼格式進行說明。采用這種編碼意味著你不能想文本數據那樣一次獲得一行內容,而是依次獲得key和value序列,其中key和value都由兩部分組成,第一部分是長度(4個字節),第二部分是字節內容,比如你的key是dongxicheng,value是goodman,則傳遞給hadoop streaming程序的輸入數據格式為11 dongxicheng 7 goodman。為此,我們編寫下面的Mapper程序解析這種數據:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | int main() { ?string key, value; ?while(!cin.eof()) { ??if(!FileUtil::ReadString(key, cin)) ???break; ??FileUtil::ReadString(value, cin); ??Person person; ??ProtoUtil::ParseFromString(value, person); ??cout << person.name() << "\t" << person.age() ???????<< "," << person.address() ???????<< "," << person.phone() << endl; ?} ?return 0; } |
其中,輔助函數實現如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | class ProtoUtil { ?public: ??static bool ParseFromString(const string& str, Person &person) { ???if(person.ParseFromString(str)) ????return true; ???return false; ??} }; class FileUtil { ?public: ??static bool ReadInt(unsigned int *len, istream &stream) { ???if(!stream.read((char *)len, sizeof(unsigned int))) ????return false; ???*len = bswap_32(*len); ???return true; ??} ??static bool ReadString(string &str, istream &stream) { ???unsigned int len; ???if(!ReadInt(&len, stream)) ????return false; ???str.resize(len); ???if(!ReadBytes(&str[0], len, stream)) ????return false; ???return true; ??} ??static bool ReadBytes(char *ptr, unsigned int len, istream &stream) { ???stream.read(ptr, sizeof(unsigned char) * len); ???if(stream.eof()) return false; ???return true; ??} }; |
該程序需要注意以下幾點:
(1)注意大小端編碼規則,解析key和value長度時,需要對長度進行字節翻轉。
(2)注意循環結束條件,僅僅靠!cin.eof()判定是不夠的,僅靠這個判定會導致多輸出一條重復數據。
(3)本程序只能運行在linux系統下,windows操作系統下將無法運行,因為windows下的標準輸入cin并直接支持二進制數據讀取,需要將其強制以二進制模式重新打開后再使用。
3. 程序測試與運行
程序寫好后,第一步是編譯C++程序。由于該程序需要運行在多節點的Hadoop集群上,為了避免部署或者分發動態庫帶來的麻煩,我們直接采用靜態編譯方式,這也是編寫Hadoop C++程序的基本規則。為了靜態編譯以上MapReduce程序,安裝protocol buffers時,需采用以下流程(強調第一步),
./configure –disable-shared
make –j4
make install
然后使用以下命令編譯程序,生成可執行文件ProtoMapper:
g++ -o ProtoMapper ProtoMapper.cpp person.pb.cc `pkg-config –cflags –static –libs protobuf` -lpthread
在正式將程序提交到Hadoop集群之前,需要先在本地進行測試,本地測試運行腳本如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | #!/bin/bash HADOOP_HOME=/opt/dong/yarn-client INPUT_PATH=/tmp/person.seq OUTPUT_PATH=file:///tmp/output111 echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ???${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ ??-D mapred.reduce.tasks=0\ ??-D stream.map.input=rawbytes\ ??-files ProtoMapper\ ??-jt local\ ??-fs local\ ??-input $INPUT_PATH\ ??-output $OUTPUT_PATH\ ??-inputformat SequenceFileInputFormat\ ??-mapper ProtoMapper |
注意以下幾點:
(1)使用stream.map.input指定輸入數據解析成rawbytes格式
(2) 使用-jt和-fs兩個參數將程序運行模式設置為local模式
(3)使用-inputformat指定輸入數據格式為SequenceFileInputFormat
(4)使用mapred.reduce.tasks將Reduce Task數目設置為0
在本地tmp/output111目錄下查看測試結果是否正確,如果沒問題,可改寫該腳本(去掉-fs和-jt兩個參數,將輸入和輸出目錄設置成HDFS上的目錄),將程序直接運行在Hadoop上。
原創文章,轉載請注明:?轉載自董的博客
本文鏈接地址:?http://dongxicheng.org/mapreduce-nextgen/hadoop-streaming-process-binary-data/
總結
以上是生活随笔為你收集整理的利用Hadoop Streaming处理二进制格式文件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Spark学习:利用Sca
- 下一篇: 如何高效的阅读Hadoop源代码?Had