日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop Streaming详解

發(fā)布時間:2023/11/30 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop Streaming详解 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

一: Hadoop Streaming詳解

1、Streaming的作用

Hadoop Streaming框架,最大的好處是,讓任何語言編寫的map, reduce程序能夠在hadoop集群上運行;map/reduce程序只要遵循從標準輸入stdin讀,寫出到標準輸出stdout即可

其次,容易進行單機調(diào)試,通過管道前后相接的方式就可以模擬streaming, 在本地完成map/reduce程序的調(diào)試

# cat inputfile | mapper | sort | reducer > output

最后,streaming框架還提供了作業(yè)提交時的豐富參數(shù)控制,直接通過streaming參數(shù),而不需要使用java語言修改;很多mapreduce的高階功能,都可以通過steaming參數(shù)的調(diào)整來完成

?

?2、Streaming的局限

Streaming默認只能處理文本數(shù)據(jù)Textfile,對于二進制數(shù)據(jù),比較好的方法是將二進制的key, value進行base64編碼,轉(zhuǎn)化為文本

Mapper和reducer的前后都要進行標準輸入和標準輸出的轉(zhuǎn)化,涉及數(shù)據(jù)拷貝和解析,帶來了一定的開銷

?

3、Streaming命令的相關(guān)參數(shù)? ? (普通選項、streaming選項)

Streaming命令的形式如下:

#? /usr/local/src/hadoop-1.2.1/bin/hadoop jar ?hadoop-streaming.jar \

? ?[普通選項]? [Streaming選項]????? ???# ?注意:普通選項一定要寫在streaming選項前面

?

普通選項

參數(shù)

可選/必選

解釋

-conf? 配置文件

可選

指定一個應(yīng)用程序配置文件

-fs? host:port or local

可選

指定一個namenode

-jt?? host:port? or local

可選

指定一個jobtracker

-files 文件1,文件2,

?

-files? hdfs://192.168.179.100:9000/file1.txt,

hdfs://192.168.179.100:9000/file2.txt

?

將代替-cacheFile選項

可選

類似-file, 不同的

1)將HDFS中的多個文件進行分發(fā)

?

2)文件已經(jīng)位于HDFS上

?

3)框架會在該作業(yè)attemps目錄內(nèi)創(chuàng)建一個符號鏈接,指向該作業(yè)的jar目錄(放置所有分發(fā)文件)

-archives

?

框架會在作業(yè)的attempt目錄創(chuàng)建符號鏈接,指向作業(yè)的jar目錄,jar目錄中才是分發(fā)到本地的壓縮文件

?

-archives hdfs://host:fs_port/user/testfile.tgz#tgzdir

testfile.tgz是用戶上傳到HDFS的打包壓縮文件

#后的tgzdir是別名,hadoop-1.2.1中必須要別名

可選

逗號分隔的多個壓縮文件,已經(jīng)位于HDFS上

?

框架自動分發(fā)壓縮文件到計算節(jié)點,并且Inputformat會自動進行解壓

-D?? property=value

可選

重點,很多屬性通過-D指定

?

插曲1: mapred-site.xml 指定mapslotreduceslot

Map和reduce在datanode上的運行,會受到slot的限制,并且有各自的slot限制; 每個Datanode讀取相應(yīng)的配置文件, 從而確定每個datanode上能運行的最大map,reduce個數(shù),以及節(jié)點能力是否充分發(fā)揮

Hadoop1.0中,slot在mapred-site.xml中配(mapreduce作業(yè)前配置好), 基本上每個slot在運行1個map, reduce作業(yè)后會占用1個CPU core,? ?最激進的做法是設(shè)置map和reduce的slot都是CPU core-1 (Map執(zhí)行完后才會進行reduce),? 預(yù)留1個CPU core給tasktracker(比如上報心跳等),? 但通常reducer的slot要比reducer少,考慮大多數(shù)情況下mapper要比reducer多

默認map的slot為2,reduce的slot也為2

<configuration>

??????? <property>

??????????????? <name>mapred.job.tracker</name>

??????????????? <value>http://192.168.179.100:9001</value>

??????? </property>

??????? <property>

?????????????? <name>mapred.tasktracker.map.tasks.maximum</name>

?????????????? <value>15</value>

??????? </property>

??????? <property>

?????????????? <name>mapreduce.tasktracker.tasks.reduce.maximum</name>

?????????????? <value>10</value>

??????? </property>

</configuration>

?

插曲二: mapred-site.xml 指定map最終輸出的merge文件的存放路徑

<configuration>

??????? <property>

??????????????? <name>mapred.job.tracker</name>

??????????????? <value>http://192.168.179.100:9001</value>

??????? </property>

??????? <property>

?????????????? <name>mapred.local.dir</name>

?????????????? <value>/usr/loca/src/hadoop-1.2.1/tmp/mapoutput</value>

??????? </property>

</configuration>

?

當1個作業(yè)被提交并在tasktracer的管理下開始運行時,會對每個job創(chuàng)建1個目錄,所有分發(fā)的文件,都放置在這里

${mapred.local.dir}/taskTracker/$user/jobcache/$jobid/jars/

?

普通選項中的-D? property=value
-D??? 普通選項,使用最多的高級參數(shù),替代-jobconf(參數(shù)將被廢棄),需要注意的是 -D選項要放在streaming參數(shù)的前面,一般我會放在參數(shù)的開頭

類別

?

?

?

?

指定目錄

-D? dfs.data.dir=/tmp

修改本地臨時目錄

?

-D? mapred.local.dir=/tmp/local

-D? mapred.system.dir=/tmp/system

-D? mapred.tmp.dir=/tmp/tmp

指定額外的本地臨時目錄

?

指定作業(yè)名

-D? mapred.job.name=”Test001”

?

?

指定只有map的作業(yè)

-D? mapred.reduce.tasks=0

該作業(yè)只有mapper, mapper的輸出直接作為作業(yè)的輸出

?

指定reducer個數(shù)

-D? mapred.reduce.tasks=2

?

?

指定mapper個數(shù)

-D? mapred.map.tasks=2

指定了不一定生效輸入文件為壓縮文件時,mapper和壓縮文件個數(shù)一一對應(yīng),

輸入數(shù)據(jù)為壓縮文件時,mapper和文件個數(shù)一一對應(yīng),比較好的控制Mapper數(shù)量的方法

指定Mapper輸出的key,value分隔符

-D stream.map.output.field.separator=.

-D stream.num.map.output.key.fields=4

Mapper的輸出使用.做分割符,并且第4個.之前的部分作為key, 剩余的部分作為value (包含剩余的.)

如果mapper的輸出沒有4個., 則整體一行作為key, value為空

默認:

使用

\t做分隔符,第1個\t之前的部分作為key, 剩余為value, 如果mapper輸出沒有\(zhòng)t,則整體一行作為key,value為空

指定reducer輸出的value, key分隔符

-D stream.reduce.output.field.seperator=.

-D stream.num.reduce.output.key.fields=4

指定reduce輸出根據(jù).分割,直到第4個.之前的內(nèi)容為key,其他為value

Reducer程序要根據(jù)指定進行key,value的構(gòu)造

不常用

-D stream.map.input.field.seperator

Inputformat如何分行,默認\n

?

不常用

-D stream.reduce.input.field.seperator

?

?

作業(yè)優(yōu)先級

-D? mapred.job.priority=HIGH

VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH

?

最多同時運行的map任務(wù)數(shù)

-D mapred.job.map.capacity=5

?

?

最多同時運行的reduce任務(wù)數(shù)

-D mapred.job.reduce.capacity=3

?

?

Task沒有響應(yīng)(輸入輸出)的最大時間

-D mapred.task.timeout=6000

毫秒

超時后,該task被終止

Map的輸出是否壓縮

-D mapred.compress.map.output=True

?

?

Map的輸出的壓縮方式

-D mapred.map.output.comression.codec=

?

?

Reduce的輸出是否壓縮

-D mapred.output.compress=True

?

?

Reducer的輸出的壓縮方式

-D mapred.output.compression.codec=

?

?

?

-D 指定job名稱
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D mapred.job.name=”Test001”
-D 指定reduce任務(wù)、map任務(wù)個數(shù)
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.job.name=”Teset001” -D mapred.reduce.tasks=2 # reduce task個數(shù),一定生效 -D mapred.map.tasks=5 # map task個數(shù),不一定生效
-D 指定mapper的輸出分隔符
-D stream.map.output.field.seperator=. # 指定mapper每條輸出key,value分隔符 -D stream.num.map.output.key.fields=4 # 第4個.之前的部分為key,剩余為value -D map.output.key.field.separator=. # 設(shè)置map輸出中,Key內(nèi)部的分隔符

?

-D 指定基于哪些key進行分桶

基于指定的Key進行分桶,打標簽

指定列數(shù)

-D num.key.fields.for.partition=1 # 只用1列Key做分桶 -D num.key.fields.for.partition=2 # 使用1,2共兩列key做分桶

指定某些字段做key

-D mapred.text.key.partitioner.option =-k1,2 # 第1,2列Key做分桶 -D mapred.text.key.partitioner.option =-k2,2 # 第2列key做分桶

都要修改partition為能夠只基于某些Key進行分桶的類

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-D 指定將reducer的輸出進行壓縮
-D mapred.output.compress=true-D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-D 指定將mapper的輸出進行壓縮
-D mapred.compress.map.output=true-D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec

?

-D 指定Comparatorkey進行數(shù)字、倒序排序
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ # 使用keyFieldBasedComparator進行key排序-D stream.map.output.field.separator=. \-D stream.num.map.output.key.fields=4 \-D map.output.key.field.separator=. \-D mapred.text.key.comparator.options=-k2,2nr \# -k2,2只用第二列排序,n數(shù)字排序,r倒序(從大到小)-input myInputDirs \-output myOutputDir \-mapper org.apache.hadoop.mapred.lib.IdentityMapper \-reducer org.apache.hadoop.mapred.lib.IdentityReducer

?

-D 指定每個reduce task申請的內(nèi)存數(shù)量
-D mapreduce.reduce.memory.mb=512 #單位為M

?

?

Streaming選項

參數(shù)

可選/必選

參數(shù)描述

-input <HDFS目錄或文件路徑>

支持*通配符,指定多個文件或目錄,多次-input,指定多個輸入文件/目錄

必選

Mapper的輸入數(shù)據(jù),文件要在任務(wù)提交前手動上傳到HDFS

-output <HDFS目錄>

# 路徑不能已存在,否則認為是其他job的輸出

必選

reducer輸出結(jié)果的HDFS存放路徑, ?不能已存在,但腳本中一定要配置

-mapper <可執(zhí)行命令或java>

?

-mapper “python map.py”

-mapper “bash map.sh”

-mapper “perl map.perl”

必選

Mapper程序

-reducer <可執(zhí)行命令或java>

?

-reducer “python reducer.py”

-reducer “bash reducer.sh”

-reducer “perl reducer.sh”

可選

Reducer程序,不需要reduce處理就不指定

-combiner <可執(zhí)行命令或java>

?

-combiner “python map.py”

-combiner “bash map.sh”

-combiner “perl map.perl”

可選

處理mapper輸出的combiner程序

-file

<本地mapper、reducer程序文件、程序運行需要的其他文件>

?

-file map.py

-file reduce.py

-file white_list

可選??????????????????????????? 文件在本地,小文件

將本地文件分發(fā)給計算節(jié)點

?

文件作為作業(yè)的一部分,一起被打包并提交,所有分發(fā)的文件最終會被放置在datanodejob的同一個專屬目錄下:jobcache/job_xxx/jar

?

-cacheFile

hdfs://master:9000/cachefile_dir/white_list

?

分發(fā)HDFS文件

?

Job運行需要的程序,輔助文件都先放到HDFS上,指定HDFS文件路徑,將HDFS文件拷貝到計算節(jié)點,也是都放置在job的同一個專屬目錄下:

jobcache/job_xxx/jar

-cacheArchive

?

hdfs://master:9000/w.tar.gz#WLDIR

?

分發(fā)HDFS壓縮文件、壓縮文件內(nèi)部具有目錄結(jié)構(gòu)

?

?

-numReduceTasks ?<數(shù)字>

?

-numReduceTasks? 2

可選

指定該任務(wù)的reducer個數(shù)

-inputformat? <Java類名>

可選

指定自己定義的inputformat類,默認TextInputformat類

-outputformat? <Java類名>

可選

指定自己定義的outputformat類,默認TextOutputformat類

-cmdenv? name=value

可選

傳遞給streaming命令的環(huán)境變量

?

?

二、Mapper輸入/輸出,根據(jù)哪些key分桶,根據(jù)哪些key進行排序

?

先看看Hadoop-1.2.1 文檔原文中的解釋

As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the?prefix of a line up to the first tab character?is the?key?and the rest of the line (excluding the tab character) will be the?value. If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized, as discussed later.

?

Mapper輸入:

每一個mapper開始運行時,輸入文件會被轉(zhuǎn)換成多行(TextInputformat根據(jù)\n來進行分行),并將每一行傳遞給stdin, 作為Mapper的輸入 mapper直接對stdin中的每行內(nèi)容做處理

?

Mapper輸出分隔符:

默認情況下hadoop設(shè)置mapper輸出的key, value通過tab進行分隔,可以重新指定

-D stream.map.output.field.seperator=.? ? # 指定mapper每條輸出key,value分隔符

-D stream.num.map.output.key.fields=4? ?# 第4個.之前的部分為key,剩余為value

?

mapper的輸出會經(jīng)歷

1、 partition前,根據(jù)mapper輸出分隔符分離出KeyValue

-D stream.map.output.field.separator=. ? ?# 指定mapper每條輸出key,value分隔符

-D stream.num.map.output.key.fields=4 ? # 第4個.之前的為key, 剩下的為value

-D? map.output.key.field.separator=. ? ? ? ?# 設(shè)置map輸出中,Key內(nèi)部的分隔符

?

2、 根據(jù) “分桶分隔符”,確定哪些key被用來做partition(默認是用所有key, 只有1列; 或者是Mapper輸出分隔符分離出的所有key都被用于Partition)

基于指定的Key進行分桶,打標簽

指定列數(shù)

-D num.key.fields.for.partition=1?? ?????# 只用1列Key做分桶,也就是第一列

-D num.key.fields.for.partition=2?????? # 使用1,2共兩列key做分桶(列數(shù))

?

指定某些字段做key

-D mapred.text.key.partitioner.option =-k1,2? ?# 第1,2列Key做分桶

-D mapred.text.key.partitioner.option =-k2,2?? # 第2列key做分桶

?

#都要修改partition為能夠只基于某些Key進行分桶的類

-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

?

3、Spill時根據(jù)Partition標簽和所有Key進行排序

4、Partition標簽和key之間,也是通過mapper輸出分隔符來隔離

5、reducer前的文件會刪除partition標簽,并根據(jù)Mapper輸出分隔符確定出key, 進行Reducer前的歸并排序;(reducer前的歸并排序,基于所有mapper的key進行排序

因此如果要定義新的Mapper輸出分隔符就要做到:1)mapper代碼中根據(jù)新分隔符來構(gòu)建輸出到stdout的內(nèi)容;2)提交作業(yè)時,通過—D 指定新的Mapper輸出分隔符,以及第幾個分隔符來分離Key

?

Reducer的輸入:

每個Reducer的每條輸入,就是去除Partition標簽(根據(jù)Mapper分隔符分離出partition標簽)后的內(nèi)容,和Mapper輸出到stdout中的內(nèi)容相同,但不同記錄之間已經(jīng)做了排序;因此如果重新指定了Mapper的輸出分隔符,Reducer程序就要修改為根據(jù)新的Mapper輸出分隔符來分離Key,value;

?

Reducer的輸出:

Reducer的輸出,默認也是根據(jù)tab來分離key,value, 這也是reducer程序要根據(jù)tab來組合key,value輸出給stdout的原因; ?Reducer輸出分隔符重新指定,Reducer程序中輸出給stdout的內(nèi)容也要配合新的分隔符來構(gòu)造(Reducer->stdout-> outputformat ->file, ?outputformat根據(jù)reducer的輸出分隔符來分離key,value, ?并寫入文件

-D stream.reduce.output.field.seperator=.?? ???# reducer輸出key,value間的分隔符

-D stream.num.reduce.output.key.fields=4? ???# 第4個.之前的內(nèi)容為key, 其他為value

轉(zhuǎn)載于:https://www.cnblogs.com/shay-zhangjin/p/7714868.html

總結(jié)

以上是生活随笔為你收集整理的Hadoop Streaming详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。