hadoop配置2.6.1 centos7
?
上傳文件(分發(fā))的三種方式:
1.本地:
-file 的模式,上傳一些小的文件。
例如:
-file ./test
INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_file_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func white_list" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=3" \-file ./map.py \-file ./red.py \-file ./white_list run.sh?
?
2.-cacheFile? ,向計(jì)算節(jié)點(diǎn)分發(fā)hdfs文件。(文件需要先上傳到HDFS中)
例如:
-cacheFile "hdfs://master:9000/white_list#ABC" \
?
?
3.-cacheArchive,向計(jì)算節(jié)點(diǎn)分發(fā)hdfs文件。(文件需要先上傳到HDFS中)
例如:
-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \這種情況是streaming結(jié)構(gòu)會(huì)自動(dòng)給你解壓文件,不用你去考慮。只需要改相應(yīng)的文件路徑就好了。
def get_file_handler(f):file_in = open(f, 'r')return file_indef get_cachefile_handlers(f):f_handlers_list = []if os.path.isdir(f):for fd in os.listdir(f):f_handlers_list.append(get_file_handler(f + '/' + fd))return f_handlers_listdef read_local_file_func(f):word_set = set()for cachefile in get_cachefile_handlers(f):for line in cachefile:word = line.strip()word_set.add(word)return word_setdef mapper_func(white_list_fd):word_set = read_local_file_func(white_list_fd)for line in sys.stdin:ss = line.strip().split(' ')for s in ss:word = s.strip()#if word != "" and (word in word_set):if word != "":print "%s\t%s" % (s, 1)if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]func(*args) map.py #!/usr/bin/pythonimport sysdef reduer_func():current_word = Nonecount_pool = []sum = 0for line in sys.stdin:word, val = line.strip().split('\t')if current_word == None:current_word = wordif current_word != word:for count in count_pool:sum += countprint "%s\t%s" % (current_word, sum)current_word = wordcount_pool = []sum = 0count_pool.append(int(val))for count in count_pool:sum += countprint "%s\t%s" % (current_word, str(sum))if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]func(*args)red.pyHADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func WH.gz" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=10" \-jobconf "mapred.job.name=cachefile_demo" \-jobconf "mapred.compress.map.output=true" \-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-jobconf "mapred.output.compress=true" \-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-file "./map.py" \-file "./red. red.py HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar" #!/user/bin/env python #上面這個(gè)是讓系統(tǒng)自己尋找python可執(zhí)行文件#輸入文件,多個(gè)文件可以使用,分隔,前提文件需要先上傳到hdfs上。 INPUT_FILE_PATH_1="/1.txt,/2.txt" #hdfs上的輸出文件目錄的位置 OUTPUT_PATH="/table1"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py " \-reducer "python red.py " \-file ./map.py \-file ./red.py \-jobconf mapred.reduce.tasks=2 \ #設(shè)置reduce的數(shù)量#下面兩行:是開(kāi)啟map階段產(chǎn)生的數(shù)據(jù)是否壓縮,第二行是壓縮的格式-jobconf "mapred.compress.map.output=true" \ ###1-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ ###1#下面兩行是:最終輸出的是否開(kāi)啟壓縮,及其壓縮的格式-jobconf "mapred.output.compress=true" \ ###2-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ ###2#下面是壓縮文件上傳的位置 “#”后面是別名,在配置文件中可以使用,slave節(jié)點(diǎn)#在運(yùn)行過(guò)程中也是使用別名來(lái)建目錄的。 -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \ ###第三種傳文件的方式。#下面第一行是表示以什么分隔,默認(rèn)是制表符“\t”#第二行是以分隔后的前兩個(gè)作為key,剩下為value#第三行是在key中以,分隔,#第四行是在第三行分隔后,用第一列分桶-jobconf stream.map.output.field.separator=',' / -jobconf stream.num.map.output.key.fields=2\ -jobconf map.output.key.field.separator=',' / -jobconf num.key.fields.for.partition=1 \#下面是在你自己設(shè)置partition時(shí)寫(xiě)入的東西。-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner run.sh
-jobconf mapred.text.key.partitioner.options=-k2,3 \? 相當(dāng)于-jobconf num.key.fields.for.partition=1\
的擴(kuò)展,意思是在key中,選擇2,3列作為partition
在沒(méi)有設(shè)partion的時(shí)候,默認(rèn)等于
先分桶,之后再在桶中按照key排序,
?
補(bǔ)充:!!!
可以通過(guò)壓縮文件的方式,控制map的數(shù)量,一個(gè)壓縮文件對(duì)應(yīng)一個(gè)map
還可以不影響路徑,即可以讓目錄結(jié)構(gòu)保持不變.
----------------------------------------- def get_file_handler(f):file_in = open(f, 'r')return file_indef get_cachefile_handlers(f):f_handlers_list = []if os.path.isdir(f):for fd in os.listdir(f):f_handlers_list.append(get_file_handler(f + '/' + fd))return f_handlers_listdef read_local_file_func(f):word_set = set()for cachefile in get_cachefile_handlers(f):for line in cachefile:word = line.strip()word_set.add(word)return word_setdef mapper_func(white_list_fd):word_set = read_local_file_func(white_list_fd)for line in sys.stdin:ss = line.strip().split(' ')for s in ss:word = s.strip()#if word != "" and (word in word_set):if word != "":print "%s\t%s" % (s, 1)if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]f #!/usr/bin/pythonimport sysdef reduer_func():current_word = Nonecount_pool = []sum = 0for line in sys.stdin:word, val = line.strip().split('\t')if current_word == None:current_word = wordif current_word != word:for count in count_pool:sum += countprint "%s\t%s" % (current_word, sum)current_word = wordcount_pool = []sum = 0count_pool.append(int(val))for count in count_pool:sum += countprint "%s\t%s" % (current_word, str(sum))if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]f HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func WH.gz" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=10" \-jobconf "mapred.job.name=cachefile_demo" \-jobconf "mapred.compress.map.output=true" \-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-jobconf "mapred.output.compress=true" \-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-file "./map.py" \-file "./red.
轉(zhuǎn)載于:https://www.cnblogs.com/taozizainali/p/8811893.html
總結(jié)
以上是生活随笔為你收集整理的hadoop配置2.6.1 centos7的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: HTML如何制作表单(Axure如何制作
- 下一篇: 某虚拟化项目总结:一条光纤引发的故障