Hadoop Streaming
Hadoop Streaming
Hadoop Streaming
??? Hadoop Streaming
??? Streaming工作原理
??? 將文件打包到提交的作業(yè)中
??? Streaming選項(xiàng)與用法
??????? 只使用Mapper的作業(yè)
??????? 為作業(yè)指定其他插件
??????? Hadoop Streaming中的大文件和檔案
??????? 為作業(yè)指定附加配置參數(shù)
??????? 其他選項(xiàng)
??? 其他例子
??????? 使用自定義的方法切分行來形成Key/Value對
??????? 一個(gè)實(shí)用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項(xiàng))
??????? Hadoop聚合功能包的使用(-reduce aggregate 選項(xiàng))
??????? 字段的選取(類似于unix中的 'cut' 命令)
??? 常見問題
??????? 我該怎樣使用Hadoop Streaming運(yùn)行一組獨(dú)立(相關(guān))的任務(wù)呢?
??????? 如何處理多個(gè)文件,其中每個(gè)文件一個(gè)map?
??????? 應(yīng)該使用多少個(gè)reducer?
??????? 如果在Shell腳本里設(shè)置一個(gè)別名,并放在-mapper之后,Streaming會(huì)正常運(yùn)行嗎?例如,alias cl='cut -fl',-mapper "cl"會(huì)運(yùn)行正常嗎?
??????? 我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用么?
??????? 在streaming作業(yè)中用-file選項(xiàng)運(yùn)行一個(gè)分布式的超大可執(zhí)行文件(例如,3.6G)時(shí),我得到了一個(gè)錯(cuò)誤信息“No space left on device”。如何解決?
??????? 如何設(shè)置多個(gè)輸入目錄?
??????? 如何生成gzip格式的輸出文件?
??????? Streaming中如何自定義input/output format?
??????? Streaming如何解析XML文檔?
??????? 在streaming應(yīng)用程序中如何更新計(jì)數(shù)器?
??????? 如何更新streaming應(yīng)用程序的狀態(tài)?
Hadoop streaming是Hadoop的一個(gè)工具, 它幫助用戶創(chuàng)建和運(yùn)行一類特殊的map/reduce作業(yè), 這些特殊的map/reduce作業(yè)是由一些可執(zhí)行文件或腳本文件充當(dāng)mapper或者reducer。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper /bin/cat \-reducer /bin/wcStreaming工作原理
在上面的例子里,mapper和reducer都是可執(zhí)行文件,它們從標(biāo)準(zhǔn)輸入讀入數(shù)據(jù)(一行一行讀),并把計(jì)算結(jié)果發(fā)給標(biāo)準(zhǔn)輸出。Streaming工具會(huì)創(chuàng)建一個(gè)Map/Reduce作業(yè),并把它發(fā)送給合適的集群,同時(shí)監(jiān)視這個(gè)作業(yè)的整個(gè)執(zhí)行過程。
如果一個(gè)可執(zhí)行文件被用于mapper,則在mapper初始化時(shí),每一個(gè)mapper任務(wù)會(huì)把這個(gè)可執(zhí)行文件作為一個(gè)單獨(dú)的進(jìn)程啟動(dòng)。mapper任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。同時(shí),mapper收集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把收到的每一行內(nèi)容轉(zhuǎn)化成key/value對,作為mapper的輸出。默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的(不包括tab)作為value。如果沒有tab,整行作為key值,value值為null。不過,這可以定制,在下文中將會(huì)討論如何自定義key和value的切分方式。
如果一個(gè)可執(zhí)行文件被用于reducer,每個(gè)reducer任務(wù)會(huì)把這個(gè)可執(zhí)行文件作為一個(gè)單獨(dú)的進(jìn)程啟動(dòng)。Reducer任務(wù)運(yùn)行時(shí),它把輸入切分成行并把每一行提供給可執(zhí)行文件進(jìn)程的標(biāo)準(zhǔn)輸入。同時(shí),reducer收集可執(zhí)行文件進(jìn)程標(biāo)準(zhǔn)輸出的內(nèi)容,并把每一行內(nèi)容轉(zhuǎn)化成key/value對,作為reducer的輸出。默認(rèn)情況下,一行中第一個(gè)tab之前的部分作為key,之后的(不包括tab)作為value。在下文中將會(huì)討論如何自定義key和value的切分方式。
這是Map/Reduce框架和streaming mapper/reducer之間的基本通信協(xié)議。
用戶也可以使用java類作為mapper或者reducer。上面的例子與這里的代碼等價(jià):
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer /bin/wc用戶可以設(shè)定stream.non.zero.exit.is.failure true 或false 來表明streaming task的返回值非零時(shí)是Failure 還是Success。默認(rèn)情況,streaming task返回非零時(shí)表示失敗。
將文件打包到提交的作業(yè)中
任何可執(zhí)行文件都可以被指定為mapper/reducer。這些可執(zhí)行文件不需要事先存放在集群上;如果在集群上還沒有,則需要用-file選項(xiàng)讓framework把可執(zhí)行文件作為作業(yè)的一部分,一起打包提交。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myPythonScript.py \-reducer /bin/wc \-file myPythonScript.py上面的例子描述了一個(gè)用戶把可執(zhí)行python文件作為mapper。其中的選項(xiàng)“-file myPythonScirpt.py”使可執(zhí)行python文件作為作業(yè)提交的一部分被上傳到集群的機(jī)器上。
除了可執(zhí)行文件外,其他mapper或reducer需要用到的輔助文件(比如字典,配置文件等)也可以用這種方式打包上傳。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myPythonScript.py \-reducer /bin/wc \-file myPythonScript.py \-file myDictionary.txtStreaming選項(xiàng)與用法
只使用Mapper的作業(yè)
有時(shí)只需要map函數(shù)處理輸入數(shù)據(jù)。這時(shí)只需把mapred.reduce.tasks設(shè)置為零,Map/reduce框架就不會(huì)創(chuàng)建reducer任務(wù),mapper任務(wù)的輸出就是整個(gè)作業(yè)的最終輸出。
為了做到向下兼容,Hadoop Streaming也支持“-reduce None”選項(xiàng),它與“-jobconf mapred.reduce.tasks=0”等價(jià)。
為作業(yè)指定其他插件
和其他普通的Map/Reduce作業(yè)一樣,用戶可以為streaming作業(yè)指定其他插件:
-inputformat JavaClassName-outputformat JavaClassName-partitioner JavaClassName-combiner JavaClassName用于處理輸入格式的類要能返回Text類型的key/value對。如果不指定輸入格式,則默認(rèn)會(huì)使用TextInputFormat。因?yàn)門extInputFormat得到的key值是LongWritable類型的(其實(shí)key值并不是輸入文件中的內(nèi)容,而是value偏移量),所以key會(huì)被丟棄,只把value用管道方式發(fā)給mapper。
用戶提供的定義輸出格式的類需要能夠處理Text類型的key/value對。如果不指定輸出格式,則默認(rèn)會(huì)使用TextOutputFormat類。
Hadoop Streaming中的大文件和檔案
任務(wù)使用-cacheFile和-cacheArchive選項(xiàng)在集群中分發(fā)文件和檔案,選項(xiàng)的參數(shù)是用戶已上傳至HDFS的文件或檔案的URI。這些文件和檔案在不同的作業(yè)間緩存。用戶可以通過fs.default.name.config配置參數(shù)的值得到文件所在的host和fs_port。
這個(gè)是使用-cacheFile選項(xiàng)的例子:
-cacheFile hdfs://host:fs_port/user/testfile.txt#testlink在上面的例子里,url中#后面的部分是建立在任務(wù)當(dāng)前工作目錄下的符號(hào)鏈接的名字。這里的任務(wù)的當(dāng)前工作目錄下有一個(gè)“testlink”符號(hào)鏈接,它指向testfile.txt文件在本地的拷貝。如果有多個(gè)文件,選項(xiàng)可以寫成:
-cacheFile hdfs://host:fs_port/user/testfile1.txt#testlink1 -cacheFile hdfs://host:fs_port/user/testfile2.txt#testlink2-cacheArchive選項(xiàng)用于把jar文件拷貝到任務(wù)當(dāng)前工作目錄并自動(dòng)把jar文件解壓縮。例如:
-cacheArchive hdfs://host:fs_port/user/testfile.jar#testlink3在上面的例子中,testlink3是當(dāng)前工作目錄下的符號(hào)鏈接,它指向testfile.jar解壓后的目錄。
下面是使用-cacheArchive選項(xiàng)的另一個(gè)例子。其中,input.txt文件有兩行內(nèi)容,分別是兩個(gè)文件的名字:testlink/cache.txt和testlink/cache2.txt。“testlink”是指向檔案目錄(jar文件解壓后的目錄)的符號(hào)鏈接,這個(gè)目錄下有“cache.txt”和“cache2.txt”兩個(gè)文件。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input "/user/me/samples/cachefile/input.txt" \-mapper "xargs cat" \-reducer "cat" \-output "/user/me/samples/cachefile/out" \ -cacheArchive 'hdfs://hadoop-nn1.example.com/user/me/samples/cachefile/cachedir.jar#testlink' \ -jobconf mapred.map.tasks=1 \-jobconf mapred.reduce.tasks=1 \ -jobconf mapred.job.name="Experiment"$ ls test_jar/ cache.txt cache2.txt$ jar cvf cachedir.jar -C test_jar/ . added manifest adding: cache.txt(in = 30) (out= 29)(deflated 3%) adding: cache2.txt(in = 37) (out= 35)(deflated 5%)$ hadoop dfs -put cachedir.jar samples/cachefile$ hadoop dfs -cat /user/me/samples/cachefile/input.txt testlink/cache.txt testlink/cache2.txt$ cat test_jar/cache.txt This is just the cache string$ cat test_jar/cache2.txt This is just the second cache string$ hadoop dfs -ls /user/me/samples/cachefile/out Found 1 items /user/me/samples/cachefile/out/part-00000 <r 3> 69$ hadoop dfs -cat /user/me/samples/cachefile/out/part-00000 This is just the cache string This is just the second cache string為作業(yè)指定附加配置參數(shù)
用戶可以使用“-jobconf <n>=<v>”增加一些配置變量。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper\-reducer /bin/wc \-jobconf mapred.reduce.tasks=2上面的例子中,-jobconf mapred.reduce.tasks=2表明用兩個(gè)reducer完成作業(yè)。
關(guān)于jobconf參數(shù)的更多細(xì)節(jié)可以參考:hadoop-default.html
其他選項(xiàng)
Streaming 作業(yè)的其他選項(xiàng)如下表:
| -cluster name | 可選 | 在本地Hadoop集群與一個(gè)或多個(gè)遠(yuǎn)程集群間切換 |
| -dfs host:port or local | 可選 | 覆蓋作業(yè)的HDFS配置 |
| -jt host:port or local | 可選 | 覆蓋作業(yè)的JobTracker配置 |
| -additionalconfspec specfile | 可選 | 用一個(gè)類似于hadoop-site.xml的XML文件保存所有配置,從而不需要用多個(gè)"-jobconf name=value"類型的選項(xiàng)單獨(dú)為每個(gè)配置變量賦值 |
| -cmdenv name=value | 可選 | 傳遞環(huán)境變量給streaming命令 |
| -cacheFile fileNameURI | 可選 | 指定一個(gè)上傳到HDFS的文件 |
| -cacheArchive fileNameURI | 可選 | 指定一個(gè)上傳到HDFS的jar文件,這個(gè)jar文件會(huì)被自動(dòng)解壓縮到當(dāng)前工作目錄下 |
| -inputreader JavaClassName | 可選 | 為了向下兼容:指定一個(gè)record reader類(而不是input format類) |
| -verbose | 可選 | 詳細(xì)輸出 |
使用-cluster <name>實(shí)現(xiàn)“本地”Hadoop和一個(gè)或多個(gè)遠(yuǎn)程Hadoop集群間切換。默認(rèn)情況下,使用hadoop-default.xml和hadoop-site.xml;當(dāng)使用-cluster <name>選項(xiàng)時(shí),會(huì)使用$HADOOP_HOME/conf/hadoop-<name>.xml。
下面的選項(xiàng)改變temp目錄:
-jobconf dfs.data.dir=/tmp下面的選項(xiàng)指定其他本地temp目錄:
-jobconf mapred.local.dir=/tmp/local-jobconf mapred.system.dir=/tmp/system-jobconf mapred.temp.dir=/tmp/temp更多有關(guān)jobconf的細(xì)節(jié)請參考:http://wiki.apache.org/hadoop/JobConfFile
在streaming命令中設(shè)置環(huán)境變量:
-cmdenv EXAMPLE_DIR=/home/example/dictionaries/其他例子
使用自定義的方法切分行來形成Key/Value對
之前已經(jīng)提到,當(dāng)Map/Reduce框架從mapper的標(biāo)準(zhǔn)輸入讀取一行時(shí),它把這一行切分為key/value對。在默認(rèn)情況下,每行第一個(gè)tab符之前的部分作為key,之后的部分作為value(不包括tab符)。
但是,用戶可以自定義,可以指定分隔符是其他字符而不是默認(rèn)的tab符,或者指定在第n(n>=1)個(gè)分割符處分割而不是默認(rèn)的第一個(gè)。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-jobconf stream.map.output.field.separator=. \-jobconf stream.num.map.output.key.fields=4在上面的例子,“-jobconf stream.map.output.field.separator=.”指定“.”作為map輸出內(nèi)容的分隔符,并且從在第四個(gè)“.”之前的部分作為key,之后的部分作為value(不包括這第四個(gè)“.”)。 如果一行中的“.”少于四個(gè),則整行的內(nèi)容作為key,value設(shè)為空的Text對象(就像這樣創(chuàng)建了一個(gè)Text:new Text(""))。
同樣,用戶可以使用“-jobconf stream.reduce.output.field.separator=SEP”和“-jobconf stream.num.reduce.output.fields=NUM”來指定reduce輸出的行中,第幾個(gè)分隔符處分割key和value。
一個(gè)實(shí)用的Partitioner類 (二次排序,-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 選項(xiàng))
Hadoop有一個(gè)工具類org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner,它在應(yīng)用程序中很有用。Map/reduce框架用這個(gè)類切分map的輸出,切分是基于key值的前綴,而不是整個(gè)key。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer \-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf stream.map.output.field.separator=. \-jobconf stream.num.map.output.key.fields=4 \-jobconf map.output.key.field.separator=. \-jobconf num.key.fields.for.partition=2 \-jobconf mapred.reduce.tasks=12其中,-jobconf stream.map.output.field.separator=. 和-jobconf stream.num.map.output.key.fields=4是前文中的例子。Streaming用這兩個(gè)變量來得到mapper的key/value對。
上面的Map/Reduce 作業(yè)中map輸出的key一般是由“.”分割成的四塊。但是因?yàn)槭褂昧?jobconf num.key.fields.for.partition=2 選項(xiàng),所以Map/Reduce框架使用key的前兩塊來切分map的輸出。其中,-jobconf map.output.key.field.separator=.指定了這次切分使用的key的分隔符。這樣可以保證在所有key/value對中,key值前兩個(gè)塊值相同的所有key被分到一組,分配給一個(gè)reducer。
這種高效的方法等價(jià)于指定前兩塊作為主鍵,后兩塊作為副鍵。主鍵用于切分塊,主鍵和副鍵的組合用于排序。一個(gè)簡單的示例如下:
Map的輸出(key)
11.12.1.2 11.14.2.3 11.11.4.1 11.12.1.1 11.14.2.2切分給3個(gè)reducer(前兩塊的值用于切分)
11.11.4.1 ----------- 11.12.1.2 11.12.1.1 ----------- 11.14.2.3 11.14.2.2在每個(gè)切分后的組內(nèi)排序(四個(gè)塊的值都用于排序)
11.11.4.1 ----------- 11.12.1.1 11.12.1.2 ----------- 11.14.2.2 11.14.2.3Hadoop聚合功能包的使用(-reduce aggregate 選項(xiàng))
Hadoop有一個(gè)工具包“Aggregate”(https://svn.apache.org/repos/asf/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/aggregate)。“Aggregate”提供一個(gè)特殊的reducer類和一個(gè)特殊的combiner類,并且有一系列的“聚合器”(“aggregator”)(例如“sum”,“max”,“min”等)用于聚合一組value的序列。用戶可以使用Aggregate定義一個(gè)mapper插件類,這個(gè)類用于為mapper輸入的每個(gè)key/value對產(chǎn)生“可聚合項(xiàng)”。combiner/reducer利用適當(dāng)?shù)木酆掀骶酆线@些可聚合項(xiàng)。
要使用Aggregate,只需指定“-reducer aggregate”:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper myAggregatorForKeyCount.py \-reducer aggregate \-file myAggregatorForKeyCount.py \-jobconf mapred.reduce.tasks=12python程序myAggregatorForKeyCount.py例子:
#!/usr/bin/pythonimport sys;def generateLongCountToken(id):return "LongValueSum:" + id + "\t" + "1"def main(argv):line = sys.stdin.readline();try:while line:line = line[:-1];fields = line.split("\t");print generateLongCountToken(fields[0]);line = sys.stdin.readline();except "end of file":return None if __name__ == "__main__":main(sys.argv)字段的選取(類似于unix中的 'cut' 命令)
Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduce幫助用戶高效處理文本數(shù)據(jù),就像unix中的“cut”工具。工具類中的map函數(shù)把輸入的key/value對看作字段的列表。用戶可以指定字段的分隔符(默認(rèn)是tab),可以選擇字段列表中任意一段(由列表中一個(gè)或多個(gè)字段組成)作為map輸出的key或者value。同樣,工具類中的reduce函數(shù)也把輸入的key/value對看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。例如:
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\-reducer org.apache.hadoop.mapred.lib.FieldSelectionMapReduce\-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \-jobconf map.output.key.field.separa=. \-jobconf num.key.fields.for.partition=2 \-jobconf mapred.data.field.separator=. \-jobconf map.output.key.value.fields.spec=6,5,1-3:0- \-jobconf reduce.output.key.value.fields.spec=0-2:5- \-jobconf mapred.reduce.tasks=12選項(xiàng)“-jobconf map.output.key.value.fields.spec=6,5,1-3:0-”指定了如何為map的輸出選取key和value。Key選取規(guī)則和value選取規(guī)則由“:”分割。在這個(gè)例子中,map輸出的key由字段6,5,1,2和3組成。輸出的value由所有字段組成(“0-”指字段0以及之后所有字段)。
選項(xiàng)“-jobconf reduce.output.key.value.fields.spec=0-2:0-”(譯者注:此處應(yīng)為”0-2:5-“)指定如何為reduce的輸出選取value。本例中,reduce的輸出的key將包含字段0,1,2(對應(yīng)于原始的字段6,5,1)。reduce輸出的value將包含起自字段5的所有字段(對應(yīng)于所有的原始字段)。
常見問題
我該怎樣使用Hadoop Streaming運(yùn)行一組獨(dú)立(相關(guān))的任務(wù)呢?
多數(shù)情況下,你不需要Map Reduce的全部功能,而只需要運(yùn)行同一程序的多個(gè)實(shí)例,或者使用不同數(shù)據(jù),或者在相同數(shù)據(jù)上使用不同的參數(shù)。你可以通過Hadoop Streaming來實(shí)現(xiàn)。
如何處理多個(gè)文件,其中每個(gè)文件一個(gè)map?
例如這樣一個(gè)問題,在集群上壓縮(zipping)一些文件,你可以使用以下幾種方法:
- 生成一個(gè)文件,文件中包含所有要壓縮的文件在HDFS上的完整路徑。每個(gè)map 任務(wù)獲得一個(gè)路徑名作為輸入。
- 創(chuàng)建一個(gè)mapper腳本程序,實(shí)現(xiàn)如下功能:獲得文件名,把該文件拷貝到本地,壓縮該文件并把它發(fā)到期望的輸出目錄。
- 在main函數(shù)中添加如下命令: FileOutputFormat.setCompressOutput(conf, true);FileOutputFormat.setOutputCompressorClass(conf, org.apache.hadoop.io.compress.GzipCodec.class);conf.setOutputFormat(NonSplitableTextInputFormat.class);conf.setNumReduceTasks(0);
- 編寫map函數(shù): public void map(WritableComparable key, Writable value, OutputCollector output, Reporter reporter) throws IOException {output.collect((Text)value, null);}
- 注意輸出的文件名和原文件名不同
應(yīng)該使用多少個(gè)reducer?
請參考Hadoop Wiki:Reducer
如果在Shell腳本里設(shè)置一個(gè)別名,并放在-mapper之后,Streaming會(huì)正常運(yùn)行嗎?例如,alias cl='cut -fl',-mapper "cl"會(huì)運(yùn)行正常嗎?
腳本里無法使用別名,但是允許變量替換,例如:
$ hadoop dfs -cat samples/student_marks alice 50 bruce 70 charlie 80 dan 75$ c2='cut -f2'; $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-input /user/me/samples/student_marks -mapper \"$c2\" -reducer 'cat' -output /user/me/samples/student_out -jobconf mapred.job.name='Experiment'$ hadoop dfs -ls samples/student_out Found 1 items/user/me/samples/student_out/part-00000 <r 3> 16$ hadoop dfs -cat samples/student_out/part-00000 50 70 75 80我可以使用UNIX pipes嗎?例如 –mapper "cut –fl | set s/foo/bar/g"管用么?
現(xiàn)在不支持,而且會(huì)給出錯(cuò)誤信息“java.io.IOException: Broken pipe”。這或許是一個(gè)bug,需要進(jìn)一步研究。
在streaming作業(yè)中用-file選項(xiàng)運(yùn)行一個(gè)分布式的超大可執(zhí)行文件(例如,3.6G)時(shí),我得到了一個(gè)錯(cuò)誤信息“No space left on device”。如何解決?
配置變量stream.tmpdir指定了一個(gè)目錄,在這個(gè)目錄下要進(jìn)行打jar包的操作。stream.tmpdir的默認(rèn)值是/tmp,你需要將這個(gè)值設(shè)置為一個(gè)有更大空間的目錄:
-jobconf stream.tmpdir=/export/bigspace/...如何設(shè)置多個(gè)輸入目錄?
可以使用多個(gè)-input選項(xiàng)設(shè)置多個(gè)輸入目錄:
hadoop jar hadoop-streaming.jar -input '/user/foo/dir1' -input '/user/foo/dir2'如何生成gzip格式的輸出文件?
除了純文本格式的輸出,你還可以生成gzip文件格式的輸出,你只需設(shè)置streaming作業(yè)中的選項(xiàng)‘-jobconf mapred.output.compress=true -jobconf mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCode’。
Streaming中如何自定義input/output format?
至少在Hadoop 0.14版本以前,不支持多個(gè)jar文件。所以當(dāng)指定自定義的類時(shí),你要把他們和原有的streaming jar打包在一起,并用這個(gè)自定義的jar包替換默認(rèn)的hadoop streaming jar包。
Streaming如何解析XML文檔?
你可以使用StreamXmlRecordReader來解析XML文檔。
hadoop jar hadoop-streaming.jar -inputreader "StreamXmlRecord,begin=BEGIN_STRING,end=END_STRING" ..... (rest of the command)Map任務(wù)會(huì)把BEGIN_STRING和END_STRING之間的部分看作一條記錄。
在streaming應(yīng)用程序中如何更新計(jì)數(shù)器?
streaming進(jìn)程能夠使用stderr發(fā)出計(jì)數(shù)器信息。reporter:counter:<group>,<counter>,<amount>應(yīng)該被發(fā)送到stderr來更新計(jì)數(shù)器。
如何更新streaming應(yīng)用程序的狀態(tài)?
streaming進(jìn)程能夠使用stderr發(fā)出狀態(tài)信息。reporter:status:<message> 要被發(fā)送到stderr來設(shè)置狀態(tài)。
出處:http://hadoop.apache.org/docs/r1.0.4/cn/streaming.html
總結(jié)
以上是生活随笔為你收集整理的Hadoop Streaming的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop Map/Reduce教程
- 下一篇: Spark随谈