日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop Streaming

發(fā)布時(shí)間:2025/3/21 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop Streaming 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Hadoop Streaming

Hadoop Streaming

??? Hadoop Streaming
??? Streaming工作原理
??? 將文件打包到提交的作業(yè)中
??? Streaming選項(xiàng)與用法
??????? 只使用Mapper的作業(yè)
??????? 為作業(yè)指定其他插件
??????? Hadoop Streaming中的大文件和檔案
??????? 為作業(yè)指定附加配置參數(shù)
??????? 其他選項(xiàng)
??? 其他例子
??????? 使用自定義的方法切分行來形成Key/Value對
??????? 一個(gè)實(shí)用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項(xiàng))
??????? Hadoop聚合功能包的使用(-reduce aggregate 選項(xiàng))
??????? 字段的選取(類似于unix中的 'cut' 命令)
??? 常見問題
??????? 我該怎樣使用Hadoop Streaming運(yùn)行一組獨(dú)立(相關(guān))的任務(wù)呢?
??????? 如何處理多個(gè)文件,其中每個(gè)文件一個(gè)map?
??????? 應(yīng)該使用多少個(gè)reducer?
??????? 如果在Shell腳本里設(shè)置一個(gè)別名,并放在-mapper之后,Streaming會(huì)正常運(yùn)行嗎?例如,alias cl='cut -fl',-mapper "cl"會(huì)運(yùn)行正常嗎?
??????? 我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用么?
??????? 在streaming作業(yè)中用-file選項(xiàng)運(yùn)行一個(gè)分布式的超大可執(zhí)行文件(例如,3.6G)時(shí),我得到了一個(gè)錯(cuò)誤信息“No space left on device”。如何解決?
??????? 如何設(shè)置多個(gè)輸入目錄?
??????? 如何生成gzip格式的輸出文件?
??????? Streaming中如何自定義input/output format?
??????? Streaming如何解析XML文檔?
??????? 在streaming應(yīng)用程序中如何更新計(jì)數(shù)器?
??????? 如何更新streaming應(yīng)用程序的狀態(tài)?


Hadoop streaming是Hadoop的一個(gè)工具, 它幫助用戶創(chuàng)建和運(yùn)行一類特殊的map/reduce作業(yè), 這些特殊的map/reduce作業(yè)是由一些可執(zhí)行文件或腳本文件充當(dāng)mapper或者reducer。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper /bin/cat \-reducer /bin/wc

Streaming工作原理

在上面的例子里,mapper和reducer都是可執(zhí)行文件,它們從標(biāo)準(zhǔn)輸入讀入數(shù)據(jù)(一行一行讀),并把計(jì)算結(jié)果發(fā)給標(biāo)準(zhǔn)輸出。Streaming工具會(huì)創(chuàng)建一個(gè)Map/Reduce作業(yè),并把它發(fā)送給合適的集群,同時(shí)監(jiān)視這個(gè)作業(yè)的整個(gè)執(zhí)行過程。

如果一個(gè)可執(zhí)行文件被用于mapper,則在mapper初始化時(shí),每一個(gè)mapper任務(wù)會(huì)把這個(gè)可執(zhí)行文件作為一個(gè)單獨(dú)的進(jìn)程啟動(dòng)。mapper任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。同時(shí),mapper收集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把收到的每一行內(nèi)容轉(zhuǎn)化成key/value對,作為mapper的輸出。默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的(不包括tab)作為value。如果沒有tab,整行作為key值,value值為null。不過,這可以定制,在下文中將會(huì)討論如何自定義key和value的切分方式。

如果一個(gè)可執(zhí)行文件被用于reducer,每個(gè)reducer任務(wù)會(huì)把這個(gè)可執(zhí)行文件作為一個(gè)單獨(dú)的進(jìn)程啟動(dòng)。Reducer任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。同時(shí),reducer收集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把每一行內(nèi)容轉(zhuǎn)化成key/value對,作為reducer的輸出。默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的(不包括tab)作為value。在下文中將會(huì)討論如何自定義key和value的切分方式。

這是Map/Reduce框架和streaming mapper/reducer之間的基本通信協(xié)議。

用戶也可以使用java類作為mapper或者reducer。上面的例子與這里的代碼等價(jià):

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer /bin/wc

用戶可以設(shè)定stream.non.zero.exit.is.failure truefalse 來表明streaming task的返回值非零時(shí)是Failure 還是Success。默認(rèn)情況,streaming task返回非零時(shí)表示失敗。

將文件打包到提交的作業(yè)中

任何可執(zhí)行文件都可以被指定為mapper/reducer。這些可執(zhí)行文件不需要事先存放在集群上;如果在集群上還沒有,則需要用-file選項(xiàng)讓framework把可執(zhí)行文件作為作業(yè)的一部分,一起打包提交。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myPythonScript.py \-reducer /bin/wc \-file myPythonScript.py

上面的例子描述了一個(gè)用戶把可執(zhí)行python文件作為mapper。其中的選項(xiàng)“-file myPythonScirpt.py”使可執(zhí)行python文件作為作業(yè)提交的一部分被上傳到集群的機(jī)器上。

除了可執(zhí)行文件外,其他mapper或reducer需要用到的輔助文件(比如字典,配置文件等)也可以用這種方式打包上傳。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myPythonScript.py \-reducer /bin/wc \-file myPythonScript.py \-file myDictionary.txt

Streaming選項(xiàng)與用法

只使用Mapper的作業(yè)

有時(shí)只需要map函數(shù)處理輸入數(shù)據(jù)。這時(shí)只需把mapred.reduce.tasks設(shè)置為零,Map/reduce框架就不會(huì)創(chuàng)建reducer任務(wù),mapper任務(wù)的輸出就是整個(gè)作業(yè)的最終輸出。

為了做到向下兼容,Hadoop Streaming也支持“-reduce None”選項(xiàng),它與“-jobconf mapred.reduce.tasks=0”等價(jià)。

為作業(yè)指定其他插件

和其他普通的Map/Reduce作業(yè)一樣,用戶可以為streaming作業(yè)指定其他插件:

-inputformat JavaClassName-outputformat JavaClassName-partitioner JavaClassName-combiner JavaClassName

用于處理輸入格式的類要能返回Text類型的key/value對。如果不指定輸入格式,則默認(rèn)會(huì)使用TextInputFormat。因?yàn)門extInputFormat得到的key值是LongWritable類型的(其實(shí)key值并不是輸入文件中的內(nèi)容,而是value偏移量),所以key會(huì)被丟棄,只把value用管道方式發(fā)給mapper。

用戶提供的定義輸出格式的類需要能夠處理Text類型的key/value對。如果不指定輸出格式,則默認(rèn)會(huì)使用TextOutputFormat類。

Hadoop Streaming中的大文件和檔案

任務(wù)使用-cacheFile和-cacheArchive選項(xiàng)在集群中分發(fā)文件和檔案,選項(xiàng)的參數(shù)是用戶已上傳至HDFS的文件或檔案的URI。這些文件和檔案在不同的作業(yè)間緩存。用戶可以通過fs.default.name.config配置參數(shù)的值得到文件所在的host和fs_port。

這個(gè)是使用-cacheFile選項(xiàng)的例子:

-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink

在上面的例子里,url中#后面的部分是建立在任務(wù)當(dāng)前工作目錄下的符號(hào)鏈接的名字。這里的任務(wù)的當(dāng)前工作目錄下有一個(gè)“testlink”符號(hào)鏈接,它指向testfile.txt文件在本地的拷貝。如果有多個(gè)文件,選項(xiàng)可以寫成:

-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2

-cacheArchive選項(xiàng)用于把jar文件拷貝到任務(wù)當(dāng)前工作目錄并自動(dòng)把jar文件解壓縮。例如:

-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3

在上面的例子中,testlink3是當(dāng)前工作目錄下的符號(hào)鏈接,它指向testfile.jar解壓后的目錄。

下面是使用-cacheArchive選項(xiàng)的另一個(gè)例子。其中,input.txt文件有兩行內(nèi)容,分別是兩個(gè)文件的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向檔案目錄(jar文件解壓后的目錄)的符號(hào)鏈接,這個(gè)目錄下有“cache.txt”和“cache2.txt”兩個(gè)文件。

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input "/user/me/samples/cachefile/input.txt" \-mapper "xargs cat" \-reducer "cat" \-output "/user/me/samples/cachefile/out" \ -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \ -jobconf mapred.map.tasks=1 \-jobconf mapred.reduce.tasks=1 \ -jobconf mapred.job.name="Experiment"$ ls test_jar/ cache.txt cache2.txt$ jar cvf cachedir.jar -C test_jar/ . added manifest adding: cache.txt(in = 30) (out= 29)(deflated 3%) adding: cache2.txt(in = 37) (out= 35)(deflated 5%)$ hadoop dfs -put cachedir.jar samples/cachefile$ hadoop dfs -cat /user/me/samples/cachefile/input.txt testlink/cache.txt testlink/cache2.txt$ cat test_jar/cache.txt This is just the cache string$ cat test_jar/cache2.txt This is just the second cache string$ hadoop dfs -ls /user/me/samples/cachefile/out Found 1 items /user/me/samples/cachefile/out/part-00000 <r 3> 69$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000 This is just the cache string This is just the second cache string

為作業(yè)指定附加配置參數(shù)

用戶可以使用“-jobconf <n>=<v>”增加一些配置變量。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper\-reducer /bin/wc \-jobconf mapred.reduce.tasks=2

上面的例子中,-jobconf mapred.reduce.tasks=2表明用兩個(gè)reducer完成作業(yè)。

關(guān)于jobconf參數(shù)的更多細(xì)節(jié)可以參考:hadoop-default.html

其他選項(xiàng)

Streaming 作業(yè)的其他選項(xiàng)如下表:

選項(xiàng)可選/必須描述
-cluster name可選在本地Hadoop集群與一個(gè)或多個(gè)遠(yuǎn)程集群間切換
-dfs host:port or local可選覆蓋作業(yè)的HDFS配置
-jt host:port or local可選覆蓋作業(yè)的JobTracker配置
-additionalconfspec specfile可選用一個(gè)類似于hadoop-site.xml的XML文件保存所有配置,從而不需要用多個(gè)"-jobconf name=value"類型的選項(xiàng)單獨(dú)為每個(gè)配置變量賦值
-cmdenv name=value可選傳遞環(huán)境變量給streaming命令
-cacheFile fileNameURI可選指定一個(gè)上傳到HDFS的文件
-cacheArchive fileNameURI可選指定一個(gè)上傳到HDFS的jar文件,這個(gè)jar文件會(huì)被自動(dòng)解壓縮到當(dāng)前工作目錄下
-inputreader JavaClassName可選為了向下兼容:指定一個(gè)record reader類(而不是input format類)
-verbose可選詳細(xì)輸出

使用-cluster <name>實(shí)現(xiàn)“本地”Hadoop和一個(gè)或多個(gè)遠(yuǎn)程Hadoop集群間切換。默認(rèn)情況下,使用hadoop-default.xml和hadoop-site.xml;當(dāng)使用-cluster <name>選項(xiàng)時(shí),會(huì)使用$HADOOP_HOME/conf/hadoop-<name>.xml。

下面的選項(xiàng)改變temp目錄:

-jobconf dfs.data.dir=/tmp

下面的選項(xiàng)指定其他本地temp目錄:

-jobconf mapred.local.dir=/tmp/local-jobconf mapred.system.dir=/tmp/system-jobconf mapred.temp.dir=/tmp/temp

更多有關(guān)jobconf的細(xì)節(jié)請參考:http://wiki.apache.org/hadoop/JobConfFile

在streaming命令中設(shè)置環(huán)境變量:

-cmdenv EXAMPLE_DIR=/home/example/dictionaries/

其他例子

使用自定義的方法切分行來形成Key/Value對

之前已經(jīng)提到,當(dāng)Map/Reduce框架從mapper的標(biāo)準(zhǔn)輸入讀取一行時(shí),它把這一行切分為key/value對。在默認(rèn)情況下,每行第一個(gè)tab符之前的部分作為key,之后的部分作為value(不包括tab符)。

但是,用戶可以自定義,可以指定分隔符是其他字符而不是默認(rèn)的tab符,或者指定在第n(n>=1)個(gè)分割符處分割而不是默認(rèn)的第一個(gè)。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-jobconf stream.map.output.field.separator=. \-jobconf stream.num.map.output.key.fields=4

在上面的例子,“-jobconf stream.map.output.field.separator=.”指定“.”作為map輸出內(nèi)容的分隔符,并且從在第四個(gè)“.”之前的部分作為key,之后的部分作為value(不包括這第四個(gè)“.”)。 如果一行中的“.”少于四個(gè),則整行的內(nèi)容作為key,value設(shè)為空的Text對象(就像這樣創(chuàng)建了一個(gè)Text:new Text(""))。

同樣,用戶可以使用“-jobconf stream.reduce.output.field.separator=SEP”和“-jobconf stream.num.reduce.output.fields=NUM”來指定reduce輸出的行中,第幾個(gè)分隔符處分割key和value。

一個(gè)實(shí)用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項(xiàng))

Hadoop有一個(gè)工具類org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner,它在應(yīng)用程序中很有用。Map/reduce框架用這個(gè)類切分map的輸出,切分是基于key值的前綴,而不是整個(gè)key。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf stream.map.output.field.separator=. \-jobconf stream.num.map.output.key.fields=4 \-jobconf map.output.key.field.separator=. \-jobconf num.key.fields.for.partition=2 \-jobconf mapred.reduce.tasks=12

其中,-jobconf stream.map.output.field.separator=. 和-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用這兩個(gè)變量來得到mapper的key/value對。

上面的Map/Reduce 作業(yè)中map輸出的key一般是由“.”分割成的四塊。但是因?yàn)槭褂昧?jobconf num.key.fields.for.partition=2 選項(xiàng),所以Map/Reduce框架使用key的前兩塊來切分map的輸出。其中,-jobconf map.output.key.field.separator=.指定了這次切分使用的key的分隔符。這樣可以保證在所有key/value對中,key值前兩個(gè)塊值相同的所有key被分到一組,分配給一個(gè)reducer。

這種高效的方法等價(jià)于指定前兩塊作為主鍵,后兩塊作為副鍵。主鍵用于切分塊,主鍵和副鍵的組合用于排序。一個(gè)簡單的示例如下:

Map的輸出(key)

11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2

切分給3個(gè)reducer(前兩塊的值用于切分)

11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3 11.14.2.2

在每個(gè)切分后的組內(nèi)排序(四個(gè)塊的值都用于排序)

11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3

Hadoop聚合功能包的使用(-reduce aggregate 選項(xiàng))

Hadoop有一個(gè)工具包“Aggregate”(https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate)。“Aggregate”提供一個(gè)特殊的reducer類和一個(gè)特殊的combiner類,并且有一系列的“聚合器”(“aggregator”)(例如“sum”,“max”,“min”等)用于聚合一組value的序列。用戶可以使用Aggregate定義一個(gè)mapper插件類,這個(gè)類用于為mapper輸入的每個(gè)key/value對產(chǎn)生“可聚合項(xiàng)”。combiner/reducer利用適當(dāng)?shù)木酆掀骶酆线@些可聚合項(xiàng)。

要使用Aggregate,只需指定“-reducer aggregate”:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myAggregatorForKeyCount.py \-reducer aggregate \-file myAggregatorForKeyCount.py \-jobconf mapred.reduce.tasks=12

python程序myAggregatorForKeyCount.py例子:

#!/usr/bin/pythonimport sys;def generateLongCountToken(id):return "LongValueSum:" + id + "\t" + "1"def main(argv):line = sys.stdin.readline();try:while line:line = line[:-1];fields = line.split("\t");print generateLongCountToken(fields[0]);line = sys.stdin.readline();except "end of file":return None if __name__ == "__main__":main(sys.argv)

字段的選取(類似于unix中的 'cut' 命令)

Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduce幫助用戶高效處理文本數(shù)據(jù),就像unix中的“cut”工具。工具類中的map函數(shù)把輸入的key/value對看作字段的列表。用戶可以指定字段的分隔符(默認(rèn)是tab),可以選擇字段列表中任意一段(由列表中一個(gè)或多個(gè)字段組成)作為map輸出的key或者value。同樣,工具類中的reduce函數(shù)也把輸入的key/value對看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。例如:

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\-reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf map.output.key.field.separa=. \-jobconf num.key.fields.for.partition=2 \-jobconf mapred.data.field.separator=. \-jobconf map.output.key.value.fields.spec=6,5,1-3:0- \-jobconf reduce.output.key.value.fields.spec=0-2:5- \-jobconf mapred.reduce.tasks=12

選項(xiàng)“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”指定了如何為map的輸出選取key和value。Key選取規(guī)則和value選取規(guī)則由“:”分割。在這個(gè)例子中,map輸出的key由字段6,5,1,2和3組成。輸出的value由所有字段組成(“0-”指字段0以及之后所有字段)。

選項(xiàng)“-jobconf reduce.output.key.value.fields.spec=0-2:0-”(譯者注:此處應(yīng)為”0-2:5-“)指定如何為reduce的輸出選取value。本例中,reduce的輸出的key將包含字段0,1,2(對應(yīng)于原始的字段6,5,1)。reduce輸出的value將包含起自字段5的所有字段(對應(yīng)于所有的原始字段)。

常見問題

我該怎樣使用Hadoop Streaming運(yùn)行一組獨(dú)立(相關(guān))的任務(wù)呢?

多數(shù)情況下,你不需要Map Reduce的全部功能,而只需要運(yùn)行同一程序的多個(gè)實(shí)例,或者使用不同數(shù)據(jù),或者在相同數(shù)據(jù)上使用不同的參數(shù)。你可以通過Hadoop Streaming來實(shí)現(xiàn)。

如何處理多個(gè)文件,其中每個(gè)文件一個(gè)map?

例如這樣一個(gè)問題,在集群上壓縮(zipping)一些文件,你可以使用以下幾種方法:

  • 使用Hadoop Streaming和用戶編寫的mapper腳本程序:
    • 生成一個(gè)文件,文件中包含所有要壓縮的文件在HDFS上的完整路徑。每個(gè)map 任務(wù)獲得一個(gè)路徑名作為輸入。
    • 創(chuàng)建一個(gè)mapper腳本程序,實(shí)現(xiàn)如下功能:獲得文件名,把該文件拷貝到本地,壓縮該文件并把它發(fā)到期望的輸出目錄。
  • 使用現(xiàn)有的Hadoop框架:
    • 在main函數(shù)中添加如下命令: FileOutputFormat.setCompressOutput(conf, true);FileOutputFormat.setOutputCompressorClass(conf, org.apache.hadoop.io.compress.GzipCodec.class);conf.setOutputFormat(NonSplitableTextInputFormat.class);conf.setNumReduceTasks(0);
    • 編寫map函數(shù): public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {output.collect((Text)value, null);}
    • 注意輸出的文件名和原文件名不同
  • 應(yīng)該使用多少個(gè)reducer?

    請參考Hadoop Wiki:Reducer

    如果在Shell腳本里設(shè)置一個(gè)別名,并放在-mapper之后,Streaming會(huì)正常運(yùn)行嗎?例如,alias cl='cut -fl',-mapper "cl"會(huì)運(yùn)行正常嗎?

    腳本里無法使用別名,但是允許變量替換,例如:

    $ hadoop dfs -cat samples/student_marks alice 50 bruce 70 charlie 80 dan 75$ c2='cut -f2'; $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input /user/me/samples/student_marks -mapper \"$c2\" -reducer 'cat' -output /user/me/samples/student_out -jobconf mapred.job.name='Experiment'$ hadoop dfs -ls samples/student_out Found 1 items/user/me/samples/student_out/part-00000 <r 3> 16$ hadoop dfs -cat samples/student_out/part-00000 50 70 75 80

    我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用么?

    現(xiàn)在不支持,而且會(huì)給出錯(cuò)誤信息“java.io.IOException: Broken pipe”。這或許是一個(gè)bug,需要進(jìn)一步研究。

    在streaming作業(yè)中用-file選項(xiàng)運(yùn)行一個(gè)分布式的超大可執(zhí)行文件(例如,3.6G)時(shí),我得到了一個(gè)錯(cuò)誤信息“No space left on device”。如何解決?

    配置變量stream.tmpdir指定了一個(gè)目錄,在這個(gè)目錄下要進(jìn)行打jar包的操作。stream.tmpdir的默認(rèn)值是/tmp,你需要將這個(gè)值設(shè)置為一個(gè)有更大空間的目錄:

    -jobconf stream.tmpdir=/export/bigspace/...

    如何設(shè)置多個(gè)輸入目錄?

    可以使用多個(gè)-input選項(xiàng)設(shè)置多個(gè)輸入目錄:

    hadoop jar hadoop-streaming.jar -input '/user/foo/dir1' -input '/user/foo/dir2'

    如何生成gzip格式的輸出文件?

    除了純文本格式的輸出,你還可以生成gzip文件格式的輸出,你只需設(shè)置streaming作業(yè)中的選項(xiàng)‘-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode’。

    Streaming中如何自定義input/output format?

    至少在Hadoop 0.14版本以前,不支持多個(gè)jar文件。所以當(dāng)指定自定義的類時(shí),你要把他們和原有的streaming jar打包在一起,并用這個(gè)自定義的jar包替換默認(rèn)的hadoop streaming jar包。

    Streaming如何解析XML文檔?

    你可以使用StreamXmlRecordReader來解析XML文檔。

    hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)

    Map任務(wù)會(huì)把BEGIN_STRING和END_STRING之間的部分看作一條記錄。

    在streaming應(yīng)用程序中如何更新計(jì)數(shù)器?

    streaming進(jìn)程能夠使用stderr發(fā)出計(jì)數(shù)器信息。reporter:counter:<group>,<counter>,<amount>應(yīng)該被發(fā)送到stderr來更新計(jì)數(shù)器。

    如何更新streaming應(yīng)用程序的狀態(tài)?

    streaming進(jìn)程能夠使用stderr發(fā)出狀態(tài)信息。reporter:status:<message> 要被發(fā)送到stderr來設(shè)置狀態(tài)。


    出處:http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html

    總結(jié)

    以上是生活随笔為你收集整理的Hadoop Streaming的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。