日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

利用Hadoop Streaming处理二进制格式文件

發布時間:2025/3/21 编程问答 18 豆豆
生活随笔 收集整理的這篇文章主要介紹了 利用Hadoop Streaming处理二进制格式文件 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Hadoop Streaming是Hadoop提供的多語言編程工具,用戶可以使用自己擅長的編程語言(比如python、php或C#等)編寫Mapper和Reducer處理文本數據。Hadoop Streaming自帶了一些配置參數可友好地支持多字段文本數據的處理,參與Hadoop Streaming介紹和編程,可參考我的這篇文章: “Hadoop Streaming編程實例” 。然而,隨著Hadoop應用越來越廣泛,用戶希望Hadoop Streaming不局限在處理文本數據上,而是具備更加強大的功能,包括能夠處理二進制數據;能夠支持多語言編寫Combiner等組件。隨著Hadoop 2.x的發布,這些功能已經基本上得到了完整的實現,本文將介紹如何使用Hadoop Streaming處理二進制格式的文件,包括 SequenceFile HFile 等。

注:本文用到的程序實例可在百度云:hadoop-streaming-binary-examples?下載。

在詳細介紹操作步驟之前,先介紹本文給出的實例。假設有這樣的SequenceFile,它保存了手機通訊錄信息,其中,key是好友名,value是描述該好友的一個結構體或者對象,為此,本文使用了google開源的protocol buffer這一序列化/反序列化框架,protocol buffer結構體定義如下:

1 2 3 4 5 6 7 8 9 option java_package = ""; option java_outer_classname="PersonInfo"; message Person { ??optional string name = 1; ??optional int32 age = 2; ??optional int64 phone = 3; ??optional string address = 4; }

SequenceFile文件中的value便是保存的Person對象序列化后的字符串,這是典型的二進制數據,不能像文本數據那樣可通過換行符解析出每條記錄,因為二進制數據的每條記錄中可能包含任意字符,包括換行符。

一旦有了這樣的SequenceFile之后,我們將使用Hadoop Streaming編寫這樣的MapReduce程序:這個MapReduce程序只有Map Task,任務是解析出文件中的每條好友記錄,并以name \t age,phone,address的文本格式保存到HDFS上。

1. 準備數據

首先,我們需要準備上面介紹的SequenceFile數據,生成數據的核心代碼如下:

1 2 3 4 5 6 7 8 9 10 11 12 final SequenceFile.Writer out = ????????SequenceFile.createWriter(fs, getConf(), new Path(args[0]), ????????????????Text.class, BytesWritable.class); Text nameWrapper = new Text(); BytesWritable personWrapper = new BytesWritable(); System.out.println("Generating " + num + " Records......"); for(int i = 0; i < num; i++) { ??genOnePerson(nameWrapper, personWrapper); ??System.out.println("Generating " + i + " Records," + nameWrapper.toString() + "......"); ??out.append(nameWrapper, personWrapper); } out.close();

當然,為了驗證我們產生的數據是否正確,需要編寫一個解析程序,核心代碼如下:

1 2 3 4 5 6 7 8 9 10 11 12 Reader reader = new Reader(fs, new Path(args[0]), getConf()); Text key = new Text(); BytesWritable value = new BytesWritable(); while(reader.next(key, value)) { ??System.out.println("key:" + key.toString()); ??value.setCapacity(value.getSize()); // Very important!!! Very Tricky!!! ??PersonInfo.Person person = PersonInfo.Person.parseFrom(value.getBytes()); ??System.out.println("age:" + person.getAge() ??????????+ ",address:" + person.getAddress() ??????????+",phone:" + person.getPhone()); } reader.close();

需要注意的,Value保存類型為BytesWritable,使用這個類型非常容易犯錯誤。當你把一堆byte[]數據保存到BytesWritable后,通過BytesWritable.getBytes()再讀到的數據并不一定是原數據,可能變長了很多,這是因為BytesWritable采用了自動內存增長算法,你保存的數據長度為size時,它可能將數據保存到了長度為capacity(capacity>size)的buffer中,這時候,你通過BytesWritable.getBytes()得到的數據最后一些字符是多余的,如果里面保存的是protocol buffer序列化后的字符串,則無法反序列化,這時候可以使用BytesWritable.setCapacity (value.getSize())將后面多余空間剔除掉。

2. 使用Hadoop Streaming編寫C++程序

為了說明Hadoop Streaming如何處理二進制格式數據,本文僅僅以C++語言為例進行說明,其他語言的設計方法類似。

先簡單說一下原理。當輸入數據是二進制格式時,Hadoop Streaming會對輸入key和value進行編碼后,通過標準輸入傳遞給你的Hadoop Streaming程序,目前提供了兩種編碼格式,分別是rawtypes和???????? typedbytes,你可以設計你想采用的格式,這兩種編碼規則如下(具體在文章“Hadoop Streaming高級編程”中已經介紹了):

rawbytes:key和value均用【4個字節的長度+原始字節】表示

typedbytes:key和value均用【1字節類型+4字節長度+原始字節】表示

本文將采用第一種編碼格式進行說明。采用這種編碼意味著你不能想文本數據那樣一次獲得一行內容,而是依次獲得key和value序列,其中key和value都由兩部分組成,第一部分是長度(4個字節),第二部分是字節內容,比如你的key是dongxicheng,value是goodman,則傳遞給hadoop streaming程序的輸入數據格式為11 dongxicheng 7 goodman。為此,我們編寫下面的Mapper程序解析這種數據:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 int main() { ?string key, value; ?while(!cin.eof()) { ??if(!FileUtil::ReadString(key, cin)) ???break; ??FileUtil::ReadString(value, cin); ??Person person; ??ProtoUtil::ParseFromString(value, person); ??cout << person.name() << "\t" << person.age() ???????<< "," << person.address() ???????<< "," << person.phone() << endl; ?} ?return 0; }

其中,輔助函數實現如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class ProtoUtil { ?public: ??static bool ParseFromString(const string& str, Person &person) { ???if(person.ParseFromString(str)) ????return true; ???return false; ??} }; class FileUtil { ?public: ??static bool ReadInt(unsigned int *len, istream &stream) { ???if(!stream.read((char *)len, sizeof(unsigned int))) ????return false; ???*len = bswap_32(*len); ???return true; ??} ??static bool ReadString(string &str, istream &stream) { ???unsigned int len; ???if(!ReadInt(&len, stream)) ????return false; ???str.resize(len); ???if(!ReadBytes(&str[0], len, stream)) ????return false; ???return true; ??} ??static bool ReadBytes(char *ptr, unsigned int len, istream &stream) { ???stream.read(ptr, sizeof(unsigned char) * len); ???if(stream.eof()) return false; ???return true; ??} };

該程序需要注意以下幾點:

(1)注意大小端編碼規則,解析key和value長度時,需要對長度進行字節翻轉。

(2)注意循環結束條件,僅僅靠!cin.eof()判定是不夠的,僅靠這個判定會導致多輸出一條重復數據。

(3)本程序只能運行在linux系統下,windows操作系統下將無法運行,因為windows下的標準輸入cin并直接支持二進制數據讀取,需要將其強制以二進制模式重新打開后再使用。

3. 程序測試與運行

程序寫好后,第一步是編譯C++程序。由于該程序需要運行在多節點的Hadoop集群上,為了避免部署或者分發動態庫帶來的麻煩,我們直接采用靜態編譯方式,這也是編寫Hadoop C++程序的基本規則。為了靜態編譯以上MapReduce程序,安裝protocol buffers時,需采用以下流程(強調第一步),

./configure –disable-shared

make –j4

make install

然后使用以下命令編譯程序,生成可執行文件ProtoMapper:

g++ -o ProtoMapper ProtoMapper.cpp person.pb.cc `pkg-config –cflags –static –libs protobuf` -lpthread

在正式將程序提交到Hadoop集群之前,需要先在本地進行測試,本地測試運行腳本如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #!/bin/bash HADOOP_HOME=/opt/dong/yarn-client INPUT_PATH=/tmp/person.seq OUTPUT_PATH=file:///tmp/output111 echo "Clearing output path: $OUTPUT_PATH" $HADOOP_HOME/bin/hadoop fs -rmr $OUTPUT_PATH ${HADOOP_HOME}/bin/hadoop jar\ ???${HADOOP_HOME}/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar\ ??-D mapred.reduce.tasks=0\ ??-D stream.map.input=rawbytes\ ??-files ProtoMapper\ ??-jt local\ ??-fs local\ ??-input $INPUT_PATH\ ??-output $OUTPUT_PATH\ ??-inputformat SequenceFileInputFormat\ ??-mapper ProtoMapper

注意以下幾點:

(1)使用stream.map.input指定輸入數據解析成rawbytes格式

(2) 使用-jt和-fs兩個參數將程序運行模式設置為local模式

(3)使用-inputformat指定輸入數據格式為SequenceFileInputFormat

(4)使用mapred.reduce.tasks將Reduce Task數目設置為0

在本地tmp/output111目錄下查看測試結果是否正確,如果沒問題,可改寫該腳本(去掉-fs和-jt兩個參數,將輸入和輸出目錄設置成HDFS上的目錄),將程序直接運行在Hadoop上。

原創文章,轉載請注明:?轉載自董的博客

本文鏈接地址:?http://dongxicheng.org/mapreduce-nextgen/hadoop-streaming-process-binary-data/

總結

以上是生活随笔為你收集整理的利用Hadoop Streaming处理二进制格式文件的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。