日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop pipes编程

發(fā)布時間:2025/3/21 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop pipes编程 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1. Hadoop pipes編程介紹

Hadoop pipes允許C++程序員編寫mapreduce程序,它允許用戶混用C++和Java的RecordReader, Mapper, Partitioner,Rducer和RecordWriter等五個組件。關于Hadoop pipes的設計思想,可參見我這篇文章:Hadoop Pipes設計原理

本文介紹了Hadoop pipes編程的基本方法,并給出了若干編程示例,最后介紹了Hadoop pipes高級編程方法,包括怎樣在MapReduce中加載詞典,怎么傳遞參數(shù),怎樣提高效率等。

2. Hadoop pipes編程初體驗

Hadoop-0.20.2源代碼中自帶了三個pipes編程示例,它們位于目錄src/examples/pipes/impl中,分別為wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面簡要介紹一下這三個程序。

(1) wordcount-simple.cc:Mapper和Reducer組件采用C++語言編寫,RecordReader, Partitioner和RecordWriter采用Java語言編寫,其中,RecordReader 為LineRecordReader(位于InputTextInputFormat中,按行讀取數(shù)據(jù),行所在的偏移量為key,行中的字符串為value),Partitioner為PipesPartitioner,RecordWriter為LineRecordWriter(位于InputTextOutputFormat中,輸出格式為”key\tvalue\n”)

(2) wordcount-part.cc:Mapper,Partitioner和Reducer組件采用C++語言編寫,其他采用Java編寫

(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++編寫

接下來簡單介紹一下wordcount-simple.cc的編譯和運行方法。

在Hadoop的安裝目錄下,執(zhí)行下面命令:

1 ant -Dcompile.c++=yes examples

則wordcount-simple.cc生成的可執(zhí)行文件wordcount-simple被保存到了目錄build/c++-examples/Linux-amd64-64/bin/中,然后將該可執(zhí)行文件上傳到HDFS的某一個目錄下,如/user/XXX/ bin下:

1 bin/hadoop? -put? build/c++-examples/Linux-amd64-64/bin/wordcount-simple? /user/XXX/ bin/

上傳一份數(shù)據(jù)到HDFS的/user/XXX /pipes_test_data目錄下:

1 bin/hadoop? -put? data.txt? /user/XXX /pipes_test_data

直接使用下面命令提交作業(yè):

1 2 3 4 5 6 7 8 9 10 11 12 13 bin/hadoop pipes \ -D hadoop.pipes.java.recordreader=true \ -D hadoop.pipes.java.recordwriter=true \ -D mapred.job.name= wordcount \ -input /user/XXX /pipes_test_data \ -output /user/XXX /pipes_test_output \ -program /user/XXX/ bin/wordcount-simple

3. Hadoop pipes編程方法

先從最基礎的兩個組件Mapper和Reducer說起。

(1) Mapper編寫方法

用戶若要實現(xiàn)Mapper組件,需繼承HadoopPipes::Mapper虛基類,它的定義如下:

1 2 3 4 5 6 7 class Mapper: public Closable { public: virtual void map(MapContext& context) = 0; };

用戶必須實現(xiàn)map函數(shù),它的參數(shù)是MapContext,該類的聲明如下:

1 2 3 4 5 6 7 8 9 10 11 class MapContext: public TaskContext { public: virtual const std::string& getInputSplit() = 0; virtual const std::string& getInputKeyClass() = 0; virtual const std::string& getInputValueClass() = 0; };

而TaskContext類地聲明如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class TaskContext { public: class Counter { …… public: Counter(int counterId) : id(counterId) {} Counter(const Counter& counter) : id(counter.id) {} …… }; virtual const JobConf* getJobConf() = 0; virtual const std::string& getInputKey() = 0; virtual const std::string& getInputValue() = 0; virtual void emit(const std::string& key, const std::string& value) = 0; virtual void progress() = 0; ……. };

用戶可以從context參數(shù)中獲取當前的key,value,progress和inputsplit等數(shù)據(jù)信息,此外,還可以調(diào)用emit將結果回傳給Java代碼。

Mapper的構造函數(shù)帶有一個HadoopPipes::TaskContext參數(shù),用戶可以通過它注冊一些全局counter,對于程序調(diào)試和跟蹤作業(yè)進度非常有用:

如果你想注冊全局counter,在構造函數(shù)添加一些類似的代碼:

1 2 3 4 5 6 7 WordCountMap(HadoopPipes::TaskContext& context) { inputWords1 = context.getCounter(“group”, ”counter1”); inputWords2 = context.getCounter(“group”, ”counter2”); }

當需要增加counter值時,可以這樣:

1 2 3 context.incrementCounter(inputWords1, 1); context.incrementCounter(inputWords2, 1);

其中getCounter的兩個參數(shù)分別為組名和組內(nèi)計數(shù)器名,一個組中可以存在多個counter。

用戶自定義的counter會在程序結束時,輸出到屏幕上,當然,用戶可以用通過web界面看到。

(2) Reducer編寫方法

Reducer組件的編寫方法跟Mapper組件類似,它需要繼承虛基類public HadoopPipes::Reducer。

與Mapper組件唯一不同的地方時,map函數(shù)的參數(shù)類型為HadoopPipes::ReduceContext,它包含一個nextValue()方法,這允許用于遍歷當前key對應的value列表,依次進行處理。

接下來介紹RecordReader, Partitioner和RecordWriter的編寫方法:

(3) RecordReader編寫方法

用戶自定義的RecordReader類需要繼承虛基類HadoopPipes::RecordReader,它的聲明如下:

1 2 3 4 5 6 7 8 9 class RecordReader: public Closable { public: virtual bool next(std::string& key, std::string& value) = 0; virtual float getProgress() = 0; };

用戶需要實現(xiàn)next和 getProgress兩個方法。

用戶自定義的RecordReader的構造函數(shù)可攜帶類型為HadoopPipes::MapContext的參數(shù),通過該參數(shù)的getInputSplit()的方法,用戶可以獲取經(jīng)過序列化的InpuSplit對象,Java端采用不同的InputFormat可導致InputSplit對象格式不同,但對于大多數(shù)InpuSplit對象,它們可以提供至少三個信息:當前要處理的InputSplit所在的文件名,所在文件中的偏移量,它的長度。用戶獲取這三個信息后,可使用libhdfs庫讀取文件,以實現(xiàn)next方法。

下面介紹一下反序列化InputSplit對象的方法:

【1】 如果Java端采用的InputFormat為WordCountInpuFormat,可以這樣:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class XXXReader: public HadoopPipes::RecordReader { public: XXXReader (HadoopPipes::MapContext& context) { std::string filename; HadoopUtils::StringInStream stream(context.getInputSplit()); HadoopUtils::deserializeString(filename, stream); …… };

【2】 如果Java端采用的InputFormat為TextInpuFormat,可以這樣:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 class XXXReader: public HadoopPipes::RecordReader { public: XXXReader (HadoopPipes::MapContext& context) { std::string filename; HadoopUtils::StringInStream stream(context.getInputSplit()); readString(filename, stream); int start = (int)readLong(stream); int len = (int)readLong(stream); …… private: void readString(std::string& t, HadoopUtils::StringInStream& stream) { int len = readShort(stream); if (len > 0) { // resize the string to the right length t.resize(len); // read into the string in 64k chunks const int bufSize = 65536; int offset = 0; char buf[bufSize]; while (len > 0) { int chunkLength = len > bufSize ? bufSize : len; stream.read(buf, chunkLength); t.replace(offset, chunkLength, buf, chunkLength); offset += chunkLength; len -= chunkLength; } } else { t.clear(); } } long readLong(HadoopUtils::StringInStream& stream) { long n; char b; stream.read(&b, 1); n = (long)(b & 0xff) << 56 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 48 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 40 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 32 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 24 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 16 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 8 ; stream.read(&b, 1); n |= (long)(b & 0xff) ; return n; } };

(4) Partitioner編寫方法

用戶自定義的Partitioner類需要繼承虛基類HadoopPipes:: Partitioner,它的聲明如下:

1 2 3 4 5 6 7 8 9 class Partitioner { public: virtual int partition(const std::string& key, int numOfReduces) = 0; virtual ~Partitioner() {} };

用戶需要實現(xiàn)partition方法和 析構函數(shù)。

對于partition方法,框架會自動為它傳入兩個參數(shù),分別為key值和reduce task的個數(shù)numOfReduces,用戶只需返回一個0~ numOfReduces-1的值即可。

(5) RecordWriter編寫方法

用戶自定義的RecordWriter類需要繼承虛基類HadoopPipes:: RecordWriter,它的聲明如下:

1 2 3 4 5 6 7 8 9 class RecordWriter: public Closable { public: virtual void emit(const std::string& key, const std::string& value) = 0; };

用戶自定的RecordWriter的構造函數(shù)可攜帶類型為HadoopPipes::MapContext的參數(shù),通過該參數(shù)的getJobConf()可獲取一個HadoopPipes::JobConf的對象,用戶可從該對象中獲取該reduce task的各種參數(shù),如:該reduce task的編號(這對于確定輸出文件名有用),reduce task的輸出目錄等。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class MyWriter: public HadoopPipes::RecordWriter { public: MyWriter(HadoopPipes::ReduceContext& context) { const HadoopPipes::JobConf* job = context.getJobConf(); int part = job->getInt("mapred.task.partition"); std::string outDir = job->get("mapred.work.output.dir"); …… } }

用戶需實現(xiàn)emit方法,將數(shù)據(jù)寫入某個文件。

4. Hadoop pipes編程示例

網(wǎng)上有很多人懷疑Hadoop pipes自帶的程序wordcount-nopipe.cc不能運行,各個論壇都有討論,在此介紹該程序的設計原理和運行方法。

該運行需要具備以下前提:

(1) 采用的InputFormat為WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中

(2) 輸入目錄和輸出目錄需位于各個datanode的本地磁盤上,格式為:file:///home/xxx/pipes_test (注意,hdfs中的各種接口同時支持本地路徑和HDFS路徑,如果是HDFS上的路徑,需要使用hdfs://host:9000/user/xxx,表示/user/xxx為namenode 為host的hdfs上的路徑,而本地路徑,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test為本地路徑。例如,bin/hadoop fs –ls file:///home/xxx/pipes_test表示列出本地磁盤上/home/xxx/pipes_tes下的文件)

待確定好各個datanode的本地磁盤上有輸入數(shù)據(jù)/home/xxx/pipes_test/data.txt后,用戶首先上傳可執(zhí)行文件到HDFS中:

1 bin/hadoop? -put? build/c++-examples/Linux-amd64-64/bin/wordcount-nopipe? /user/XXX/bin/

然后使用下面命令提交該作業(yè):

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 bin/hadoop pipes \ -D hadoop.pipes.java.recordreader=false \ -D hadoop.pipes.java.recordwriter=false \ -D mapred.job.name=wordcount \ -D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \ -libjars hadoop-0.20.2-test.jar \ -input file:///home/xxx/pipes_test/data.txt \ -output file:///home/xxx/pipes_output \ -program /user/XXX/bin/wordcount-nopipe

5. Hadoop pipes高級編程

如果用戶需要在mapreduce作業(yè)中加載詞典或者傳遞參數(shù),可這樣做:

(1) 提交作業(yè)時,用-files選項,將詞典(需要傳遞參數(shù)可以放到一個配置文件中)上傳給各個datanode,如:

1 2 3 4 5 6 7 8 9 10 11 bin/hadoop pipes \ -D hadoop.pipes.java.recordreader=false \ -D hadoop.pipes.java.recordwriter=false \ -D mapred.job.name=wordcount \ -files dic.txt \ ….

(2) 在Mapper或者Reducer的構造函數(shù)中,將字典文件以本地文件的形式打開,并把內(nèi)容保存到一個map或者set中,然后再map()或者reduce()函數(shù)中使用即可,如:

1 2 3 4 5 6 7 WordCountMap(HadoopPipes::TaskContext& context) { file = fopen(“dic.txt”, "r"); //C庫函數(shù) ……. }

為了提高系能,RecordReader和RecordWriter最好采用Java代碼實現(xiàn)(或者重用Hadoop中自帶的),這是因為Hadoop自帶的C++庫libhdfs采用JNI實現(xiàn),底層還是要調(diào)用Java相關接口,效率很低,此外,如果要處理的文件為二進制文件或者其他非文本文件,libhdfs可能不好處理。

6. 總結

Hadoop pipes使C++程序員編寫MapReduce作業(yè)變得可能,它簡單好用,提供了用戶所需的大部分功能。

1.Hadoop pipes編程介紹

Hadoop pipes允許C++程序員編寫mapreduce程序,它允許用戶混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五個組件。關于Hadoop pipes的設計思想,可參見我這篇文章:

本文介紹了Hadoop pipes編程的基本方法,并給出了若干編程示例,最后介紹了Hadoop pipes高級編程方法,包括怎樣在MapReduce中加載詞典,怎么傳遞參數(shù),怎樣提高效率等。

2.Hadoop pipes編程初體驗

Hadoop-0.20.2源代碼中自帶了三個pipes編程示例,它們位于目錄src/examples/pipes/impl中,分別為wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面簡要介紹一下這三個程序。

(1)wordcount-simple.cc:Mapper和Reducer組件采用C++語言編寫,RecordReader, Partitioner和RecordWriter采用Java語言編寫,其中,RecordReader為LineRecordReader(位于InputTextInputFormat中,按行讀取數(shù)據(jù),行所在的偏移量為key,行中的字符串為value),Partitioner為PipesPartitioner,RecordWriter為LineRecordWriter(位于InputTextOutputFormat中,輸出格式為”key\tvalue\n”)

(2)wordcount-part.cc:Mapper,Partitioner和Reducer組件采用C++語言編寫,其他采用Java編寫

(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++編寫

接下來簡單介紹一下wordcount-simple.cc的編譯和運行方法。

在Hadoop的安裝目錄下,執(zhí)行下面命令:

ant -Dcompile.c++=yes examples

則wordcount-simple.cc生成的可執(zhí)行文件wordcount-simple被保存到了目錄build/c++-examples/Linux-amd64-64/bin/中,然后將該可執(zhí)行文件上傳到HDFS的某一個目錄下,如/user/XXX/ bin下:

bin/hadoop-putbuild/c++-examples/Linux-amd64-64/bin/wordcount-simple/user/XXX/ bin/

上傳一份數(shù)據(jù)到HDFS的/user/XXX /pipes_test_data目錄下:

bin/hadoop-putdata.txt/user/XXX /pipes_test_data

直接使用下面命令提交作業(yè):

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=true \

-D hadoop.pipes.java.recordwriter=true \

-D mapred.job.name= wordcount \

-input /user/XXX /pipes_test_data \

-output /user/XXX /pipes_test_output \

-program /user/XXX/ bin/wordcount-simple

3.Hadoop pipes編程方法

先從最基礎的兩個組件Mapper和Reducer說起。

(1)Mapper編寫方法

用戶若要實現(xiàn)Mapper組件,需繼承HadoopPipes::Mapper虛基類,它的定義如下:

class Mapper: public Closable {

public:

virtual void map(MapContext& context) = 0;

};

用戶必須實現(xiàn)map函數(shù),它的參數(shù)是MapContext,該類的聲明如下:

class MapContext: public TaskContext {

public:

virtual const std::string& getInputSplit() = 0;

virtual const std::string& getInputKeyClass() = 0;

virtual const std::string& getInputValueClass() = 0;

};

而TaskContext類地聲明如下:

class TaskContext {

public:

class Counter {

……

public:

Counter(int counterId) : id(counterId) {}

Counter(const Counter& counter) : id(counter.id) {}

……

};

virtual const JobConf* getJobConf() = 0;

virtual const std::string& getInputKey() = 0;

virtual const std::string& getInputValue() = 0;

virtual void emit(const std::string& key, const std::string& value) = 0;

virtual void progress() = 0;

…….

};

用戶可以從context參數(shù)中獲取當前的key,value,progress和inputsplit等數(shù)據(jù)信息,此外,還可以調(diào)用emit將結果回傳給Java代碼。

Mapper的構造函數(shù)帶有一個HadoopPipes::TaskContext參數(shù),用戶可以通過它注冊一些全局counter,對于程序調(diào)試和跟蹤作業(yè)進度非常有用:

如果你想注冊全局counter,在構造函數(shù)添加一些類似的代碼:

WordCountMap(HadoopPipes::TaskContext& context) {

inputWords1 = context.getCounter(“group”, ”counter1”);

inputWords2 = context.getCounter(“group”, ”counter2”);

}

當需要增加counter值時,可以這樣:

context.incrementCounter(inputWords1, 1);

context.incrementCounter(inputWords2, 1);

其中getCounter的兩個參數(shù)分別為組名和組內(nèi)計數(shù)器名,一個組中可以存在多個counter。

用戶自定義的counter會在程序結束時,輸出到屏幕上,當然,用戶可以用通過web界面看到。

(2)Reducer編寫方法

Reducer組件的編寫方法跟Mapper組件類似,它需要繼承虛基類public HadoopPipes::Reducer。

與Mapper組件唯一不同的地方時,map函數(shù)的參數(shù)類型為HadoopPipes::ReduceContext,它包含一個nextValue()方法,這允許用于遍歷當前key對應的value列表,依次進行處理。

接下來介紹RecordReader,Partitioner和RecordWriter的編寫方法:

(3)RecordReader編寫方法

用戶自定義的RecordReader類需要繼承虛基類HadoopPipes::RecordReader,它的聲明如下:

class RecordReader: public Closable {

public:

virtual bool next(std::string& key, std::string& value) = 0;

virtual float getProgress() = 0;

};

用戶需要實現(xiàn)next和getProgress兩個方法。

用戶自定義的RecordReader的構造函數(shù)可攜帶類型為HadoopPipes::MapContext的參數(shù),通過該參數(shù)的getInputSplit()的方法,用戶可以獲取經(jīng)過序列化的InpuSplit對象,Java端采用不同的InputFormat可導致InputSplit對象格式不同,但對于大多數(shù)InpuSplit對象,它們可以提供至少三個信息:當前要處理的InputSplit所在的文件名,所在文件中的偏移量,它的長度。用戶獲取這三個信息后,可使用libhdfs庫讀取文件,以實現(xiàn)next方法。

(4)Partitioner編寫方法

用戶自定義的Partitioner類需要繼承虛基類HadoopPipes:: Partitioner,它的聲明如下:

class Partitioner {

public:

virtual int partition(const std::string& key, int numOfReduces) = 0;

virtual ~Partitioner() {}

};

用戶需要實現(xiàn)partition方法和析構函數(shù)。

對于partition方法,框架會自動為它傳入兩個參數(shù),分別為key值和reduce task的個數(shù)numOfReduces,用戶只需返回一個0~ numOfReduces-1的值即可。

(5)RecordWriter編寫方法

用戶自定義的RecordWriter類需要繼承虛基類HadoopPipes:: RecordWriter,它的聲明如下:

class RecordWriter: public Closable {

public:

virtual void emit(const std::string& key,

const std::string& value) = 0;

};

用戶自定的RecordWriter的構造函數(shù)可攜帶類型為HadoopPipes::MapContext的參數(shù),通過該參數(shù)的getJobConf()可獲取一個HadoopPipes::JobConf的對象,用戶可從該對象中獲取該reduce task的各種參數(shù),如:該reduce task的編號(這對于確定輸出文件名有用),reduce task的輸出目錄等。

class WordCountWriter: public HadoopPipes::RecordWriter {

public:

MyWriter(HadoopPipes::ReduceContext& context) {

const HadoopPipes::JobConf* job = context.getJobConf();

int part = job->getInt(“mapred.task.partition”);

std::string outDir = job->get(“mapred.work.output.dir”);

……

}

}

用戶需實現(xiàn)emit方法,將數(shù)據(jù)寫入某個文件。

4.Hadoop pipes編程示例

網(wǎng)上有很多人懷疑Hadoop pipes自帶的程序wordcount-nopipe.cc不能運行,各個論壇都有討論,在此介紹該程序的設計原理和運行方法。

該運行需要具備以下前提:

(1)?采用的InputFormat為WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中

(2)?輸入目錄和輸出目錄需位于各個datanode的本地磁盤上,格式為:file:///home/xxx/pipes_test(注意,hdfs中的各種接口同時支持本地路徑和HDFS路徑,如果是HDFS上的路徑,需要使用hdfs://host:9000/user/xxx,表示/user/xxx為namenode為host的hdfs上的路徑,而本地路徑,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test為本地路徑)

待確定好各個datanode的本地磁盤上有輸入數(shù)據(jù)/home/xxx/pipes_test/data.txt后,用戶首先上傳可執(zhí)行文件到HDFS中:

bin/hadoop-putbuild/c++-examples/Linux-amd64-64/bin/wordcount-simple/user/XXX/ bin/

然后使用下面命令運行該程序:

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \

-libjars hadoop-0.20.2-test.jar \

-input file:/home/xxx/pipes_test/data.txt \

-output file:/home/xxx/pipes_output \

-program /user/XXX/ bin/wordcount-nopipe

5.Hadoop pipes高級編程

如果用戶需要在mapreduce作業(yè)中加載詞典或者傳遞參數(shù),可這樣做:

(1)?提交作業(yè)時,用-files選項,將詞典(需要傳遞參數(shù)可以放到一個配置文件中)上傳給各個datanode,如

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-files dic.txt \

….

(2)在Mapper或者Reducer的構造函數(shù)中,將字典文件以本地文件的形式打開,并把內(nèi)容保存到一個map或者set中,然后再map()或者reduce()函數(shù)中使用即可,如

WordCountMap(HadoopPipes::TaskContext& context) {

file = fopen(“dic.txt”, “r”); //C庫函數(shù)

…….

}

為了提高系能,RecordReader和RecordWriter最好采用Java代碼實現(xiàn)(或者重用Hadoop中自帶的),這是因為Hadoop自帶的C++庫libhdfs采用JNI實現(xiàn),底層還是要調(diào)用Java相關接口,效率很低,此外,如果要處理的文件為二進制文件或者其他非文本文件,libhdfs可能不好處理。

6.總結

1. Hadoop pipes編程介紹

Hadoop pipes允許C++程序員編寫mapreduce程序,它允許用戶混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五個組件。關于Hadoop pipes的設計思想,可參見我這篇文章:

本文介紹了Hadoop pipes編程的基本方法,并給出了若干編程示例,最后介紹了Hadoop pipes高級編程方法,包括怎樣在MapReduce中加載詞典,怎么傳遞參數(shù),怎樣提高效率等。

2. Hadoop pipes編程初體驗

Hadoop-0.20.2源代碼中自帶了三個pipes編程示例,它們位于目錄src/examples/pipes/impl中,分別為wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面簡要介紹一下這三個程序。

(1) wordcount-simple.cc:Mapper和Reducer組件采用C++語言編寫,RecordReader, Partitioner和RecordWriter采用Java語言編寫,其中,RecordReader 為LineRecordReader(位于InputTextInputFormat中,按行讀取數(shù)據(jù),行所在的偏移量為key,行中的字符串為value),Partitioner為PipesPartitioner,RecordWriter為LineRecordWriter(位于InputTextOutputFormat中,輸出格式為”key\tvalue\n”)

(2) wordcount-part.cc:Mapper,Partitioner和Reducer組件采用C++語言編寫,其他采用Java編寫

(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++編寫

接下來簡單介紹一下wordcount-simple.cc的編譯和運行方法。

在Hadoop的安裝目錄下,執(zhí)行下面命令:

ant -Dcompile.c++=yes examples

則wordcount-simple.cc生成的可執(zhí)行文件wordcount-simple被保存到了目錄build/c++-examples/Linux-amd64-64/bin/中,然后將該可執(zhí)行文件上傳到HDFS的某一個目錄下,如/user/XXX/ bin下:

bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/

上傳一份數(shù)據(jù)到HDFS的/user/XXX /pipes_test_data目錄下:

bin/hadoop -put data.txt /user/XXX /pipes_test_data

直接使用下面命令提交作業(yè):

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=true \

-D hadoop.pipes.java.recordwriter=true \

-D mapred.job.name= wordcount \

-input /user/XXX /pipes_test_data \

-output /user/XXX /pipes_test_output \

-program /user/XXX/ bin/wordcount-simple

3. Hadoop pipes編程方法

先從最基礎的兩個組件Mapper和Reducer說起。

(1) Mapper編寫方法

用戶若要實現(xiàn)Mapper組件,需繼承HadoopPipes::Mapper虛基類,它的定義如下:

class Mapper: public Closable {

public:

virtual void map(MapContext& context) = 0;

};

用戶必須實現(xiàn)map函數(shù),它的參數(shù)是MapContext,該類的聲明如下:

class MapContext: public TaskContext {

public:

virtual const std::string& getInputSplit() = 0;

virtual const std::string& getInputKeyClass() = 0;

virtual const std::string& getInputValueClass() = 0;

};

而TaskContext類地聲明如下:

class TaskContext {

public:

class Counter {

……

public:

Counter(int counterId) : id(counterId) {}

Counter(const Counter& counter) : id(counter.id) {}

……

};

virtual const JobConf* getJobConf() = 0;

virtual const std::string& getInputKey() = 0;

virtual const std::string& getInputValue() = 0;

virtual void emit(const std::string& key, const std::string& value) = 0;

virtual void progress() = 0;

…….

};

用戶可以從context參數(shù)中獲取當前的key,value,progress和inputsplit等數(shù)據(jù)信息,此外,還可以調(diào)用emit將結果回傳給Java代碼。

Mapper的構造函數(shù)帶有一個HadoopPipes::TaskContext參數(shù),用戶可以通過它注冊一些全局counter,對于程序調(diào)試和跟蹤作業(yè)進度非常有用:

如果你想注冊全局counter,在構造函數(shù)添加一些類似的代碼:

WordCountMap(HadoopPipes::TaskContext& context) {

inputWords1 = context.getCounter(“group”, ”counter1”);

inputWords2 = context.getCounter(“group”, ”counter2”);

}

當需要增加counter值時,可以這樣:

context.incrementCounter(inputWords1, 1);

context.incrementCounter(inputWords2, 1);

其中getCounter的兩個參數(shù)分別為組名和組內(nèi)計數(shù)器名,一個組中可以存在多個counter。

用戶自定義的counter會在程序結束時,輸出到屏幕上,當然,用戶可以用通過web界面看到。

(2) Reducer編寫方法

Reducer組件的編寫方法跟Mapper組件類似,它需要繼承虛基類public HadoopPipes::Reducer。

與Mapper組件唯一不同的地方時,map函數(shù)的參數(shù)類型為HadoopPipes::ReduceContext,它包含一個nextValue()方法,這允許用于遍歷當前key對應的value列表,依次進行處理。

接下來介紹RecordReader, Partitioner和RecordWriter的編寫方法:

(3) RecordReader編寫方法

用戶自定義的RecordReader類需要繼承虛基類HadoopPipes::RecordReader,它的聲明如下:

class RecordReader: public Closable {

public:

virtual bool next(std::string& key, std::string& value) = 0;

virtual float getProgress() = 0;

};

用戶需要實現(xiàn)next和 getProgress兩個方法。

用戶自定義的RecordReader的構造函數(shù)可攜帶類型為HadoopPipes::MapContext的參數(shù),通過該參數(shù)的getInputSplit()的方法,用戶可以獲取經(jīng)過序列化的InpuSplit對象,Java端采用不同的InputFormat可導致InputSplit對象格式不同,但對于大多數(shù)InpuSplit對象,它們可以提供至少三個信息:當前要處理的InputSplit所在的文件名,所在文件中的偏移量,它的長度。用戶獲取這三個信息后,可使用libhdfs庫讀取文件,以實現(xiàn)next方法。

(4) Partitioner編寫方法

用戶自定義的Partitioner類需要繼承虛基類HadoopPipes:: Partitioner,它的聲明如下:

class Partitioner {

public:

virtual int partition(const std::string& key, int numOfReduces) = 0;

virtual ~Partitioner() {}

};

用戶需要實現(xiàn)partition方法和 析構函數(shù)。

對于partition方法,框架會自動為它傳入兩個參數(shù),分別為key值和reduce task的個數(shù)numOfReduces,用戶只需返回一個0~ numOfReduces-1的值即可。

(5) RecordWriter編寫方法

用戶自定義的RecordWriter類需要繼承虛基類HadoopPipes:: RecordWriter,它的聲明如下:

class RecordWriter: public Closable {

public:

virtual void emit(const std::string& key,

const std::string& value) = 0;

};

用戶自定的RecordWriter的構造函數(shù)可攜帶類型為HadoopPipes::MapContext的參數(shù),通過該參數(shù)的getJobConf()可獲取一個HadoopPipes::JobConf的對象,用戶可從該對象中獲取該reduce task的各種參數(shù),如:該reduce task的編號(這對于確定輸出文件名有用),reduce task的輸出目錄等。

class WordCountWriter: public HadoopPipes::RecordWriter {

public:

MyWriter(HadoopPipes::ReduceContext& context) {

const HadoopPipes::JobConf* job = context.getJobConf();

int part = job->getInt(“mapred.task.partition”);

std::string outDir = job->get(“mapred.work.output.dir”);

……

}

}

用戶需實現(xiàn)emit方法,將數(shù)據(jù)寫入某個文件。

4. Hadoop pipes編程示例

網(wǎng)上有很多人懷疑Hadoop pipes自帶的程序wordcount-nopipe.cc不能運行,各個論壇都有討論,在此介紹該程序的設計原理和運行方法。

該運行需要具備以下前提:

(1) 采用的InputFormat為WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中

(2) 輸入目錄和輸出目錄需位于各個datanode的本地磁盤上,格式為:file:///home/xxx/pipes_test (注意,hdfs中的各種接口同時支持本地路徑和HDFS路徑,如果是HDFS上的路徑,需要使用hdfs://host:9000/user/xxx,表示/user/xxx為namenode 為host的hdfs上的路徑,而本地路徑,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test為本地路徑)

待確定好各個datanode的本地磁盤上有輸入數(shù)據(jù)/home/xxx/pipes_test/data.txt后,用戶首先上傳可執(zhí)行文件到HDFS中:

bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/

然后使用下面命令運行該程序:

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \

-libjars hadoop-0.20.2-test.jar \

-input file:/home/xxx/pipes_test/data.txt \

-output file:/home/xxx/pipes_output \

-program /user/XXX/ bin/wordcount-nopipe

5. Hadoop pipes高級編程

如果用戶需要在mapreduce作業(yè)中加載詞典或者傳遞參數(shù),可這樣做:

(1) 提交作業(yè)時,用-files選項,將詞典(需要傳遞參數(shù)可以放到一個配置文件中)上傳給各個datanode,如

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-files dic.txt \

….

(2) 在Mapper或者Reducer的構造函數(shù)中,將字典文件以本地文件的形式打開,并把內(nèi)容保存到一個map或者set中,然后再map()或者reduce()函數(shù)中使用即可,如

WordCountMap(HadoopPipes::TaskContext& context) {

file = fopen(“dic.txt”, “r”); //C庫函數(shù)

…….

}

為了提高系能,RecordReader和RecordWriter最好采用Java代碼實現(xiàn)(或者重用Hadoop中自帶的),這是因為Hadoop自帶的C++庫libhdfs采用JNI實現(xiàn),底層還是要調(diào)用Java相關接口,效率很低,此外,如果要處理的文件為二進制文件或者其他非文本文件,libhdfs可能不好處理。

6. 總結

Hadoop pipes使C++程序員編寫MapReduce作業(yè)變得可能,它簡單好用,提供了用戶所需的大部分功能。

Hadoop pipes使C++程序員編寫MapReduce作業(yè)變得可能,它簡單好用,提供了用戶所需的大部分功能。

原創(chuàng)文章,轉載請注明:?轉載自董的博客

本文鏈接地址:?http://dongxicheng.org/mapreduce/hadoop-pipes-programming/

總結

以上是生活随笔為你收集整理的Hadoop pipes编程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。