日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

hadoop配置2.6.1 centos7

發(fā)布時(shí)間:2023/12/15 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 hadoop配置2.6.1 centos7 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

?

上傳文件(分發(fā))的三種方式:

1.本地:

-file 的模式,上傳一些小的文件。

例如:

-file ./test

INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_file_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func white_list" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=3" \-file ./map.py \-file ./red.py \-file ./white_list run.sh

?

?

2.-cacheFile? ,向計(jì)算節(jié)點(diǎn)分發(fā)hdfs文件。(文件需要先上傳到HDFS中)

例如:

-cacheFile "hdfs://master:9000/white_list#ABC" \

?

?

3.-cacheArchive,向計(jì)算節(jié)點(diǎn)分發(fā)hdfs文件。(文件需要先上傳到HDFS中)

例如:

-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \
這種情況是streaming結(jié)構(gòu)會(huì)自動(dòng)給你解壓文件,不用你去考慮。只需要改相應(yīng)的文件路徑就好了。
def get_file_handler(f):file_in = open(f, 'r')return file_indef get_cachefile_handlers(f):f_handlers_list = []if os.path.isdir(f):for fd in os.listdir(f):f_handlers_list.append(get_file_handler(f + '/' + fd))return f_handlers_listdef read_local_file_func(f):word_set = set()for cachefile in get_cachefile_handlers(f):for line in cachefile:word = line.strip()word_set.add(word)return word_setdef mapper_func(white_list_fd):word_set = read_local_file_func(white_list_fd)for line in sys.stdin:ss = line.strip().split(' ')for s in ss:word = s.strip()#if word != "" and (word in word_set):if word != "":print "%s\t%s" % (s, 1)if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]func(*args) map.py #!/usr/bin/pythonimport sysdef reduer_func():current_word = Nonecount_pool = []sum = 0for line in sys.stdin:word, val = line.strip().split('\t')if current_word == None:current_word = wordif current_word != word:for count in count_pool:sum += countprint "%s\t%s" % (current_word, sum)current_word = wordcount_pool = []sum = 0count_pool.append(int(val))for count in count_pool:sum += countprint "%s\t%s" % (current_word, str(sum))if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]func(*args)red.pyHADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func WH.gz" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=10" \-jobconf "mapred.job.name=cachefile_demo" \-jobconf "mapred.compress.map.output=true" \-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-jobconf "mapred.output.compress=true" \-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-file "./map.py" \-file "./red. red.py HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar" #!/user/bin/env python #上面這個(gè)是讓系統(tǒng)自己尋找python可執(zhí)行文件#輸入文件,多個(gè)文件可以使用,分隔,前提文件需要先上傳到hdfs上。 INPUT_FILE_PATH_1="/1.txt,/2.txt" #hdfs上的輸出文件目錄的位置 OUTPUT_PATH="/table1"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py " \-reducer "python red.py " \-file ./map.py \-file ./red.py \-jobconf mapred.reduce.tasks=2 \ #設(shè)置reduce的數(shù)量#下面兩行:是開(kāi)啟map階段產(chǎn)生的數(shù)據(jù)是否壓縮,第二行是壓縮的格式-jobconf "mapred.compress.map.output=true" \ ###1-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ ###1#下面兩行是:最終輸出的是否開(kāi)啟壓縮,及其壓縮的格式-jobconf "mapred.output.compress=true" \        ###2-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \ ###2#下面是壓縮文件上傳的位置 “#”后面是別名,在配置文件中可以使用,slave節(jié)點(diǎn)#在運(yùn)行過(guò)程中也是使用別名來(lái)建目錄的。   -cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \ ###第三種傳文件的方式。#下面第一行是表示以什么分隔,默認(rèn)是制表符“\t”#第二行是以分隔后的前兩個(gè)作為key,剩下為value#第三行是在key中以,分隔,#第四行是在第三行分隔后,用第一列分桶-jobconf stream.map.output.field.separator=',' / -jobconf stream.num.map.output.key.fields=2\ -jobconf map.output.key.field.separator=',' / -jobconf num.key.fields.for.partition=1 \#下面是在你自己設(shè)置partition時(shí)寫(xiě)入的東西。-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner run.sh

-jobconf mapred.text.key.partitioner.options=-k2,3 \? 相當(dāng)于-jobconf num.key.fields.for.partition=1\
的擴(kuò)展,意思是在key中,選擇2,3列作為partition
在沒(méi)有設(shè)partion的時(shí)候,默認(rèn)等于
先分桶,之后再在桶中按照key排序,

?

補(bǔ)充:!!!

可以通過(guò)壓縮文件的方式,控制map的數(shù)量,一個(gè)壓縮文件對(duì)應(yīng)一個(gè)map

還可以不影響路徑,即可以讓目錄結(jié)構(gòu)保持不變.



----------------------------------------- def get_file_handler(f):file_in = open(f, 'r')return file_indef get_cachefile_handlers(f):f_handlers_list = []if os.path.isdir(f):for fd in os.listdir(f):f_handlers_list.append(get_file_handler(f + '/' + fd))return f_handlers_listdef read_local_file_func(f):word_set = set()for cachefile in get_cachefile_handlers(f):for line in cachefile:word = line.strip()word_set.add(word)return word_setdef mapper_func(white_list_fd):word_set = read_local_file_func(white_list_fd)for line in sys.stdin:ss = line.strip().split(' ')for s in ss:word = s.strip()#if word != "" and (word in word_set):if word != "":print "%s\t%s" % (s, 1)if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]f #!/usr/bin/pythonimport sysdef reduer_func():current_word = Nonecount_pool = []sum = 0for line in sys.stdin:word, val = line.strip().split('\t')if current_word == None:current_word = wordif current_word != word:for count in count_pool:sum += countprint "%s\t%s" % (current_word, sum)current_word = wordcount_pool = []sum = 0count_pool.append(int(val))for count in count_pool:sum += countprint "%s\t%s" % (current_word, str(sum))if __name__ == "__main__":module = sys.modules[__name__]func = getattr(module, sys.argv[1])args = Noneif len(sys.argv) > 1:args = sys.argv[2:]f HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop" STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"INPUT_FILE_PATH_1="/The_Man_of_Property.txt" OUTPUT_PATH="/output_cachearchive_broadcast"$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH# Step 1. $HADOOP_CMD jar $STREAM_JAR_PATH \-input $INPUT_FILE_PATH_1 \-output $OUTPUT_PATH \-mapper "python map.py mapper_func WH.gz" \-reducer "python red.py reduer_func" \-jobconf "mapred.reduce.tasks=10" \-jobconf "mapred.job.name=cachefile_demo" \-jobconf "mapred.compress.map.output=true" \-jobconf "mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-jobconf "mapred.output.compress=true" \-jobconf "mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec" \-cacheArchive "hdfs://master:9000/w.tar.gz#WH.gz" \-file "./map.py" \-file "./red.

轉(zhuǎn)載于:https://www.cnblogs.com/taozizainali/p/8811893.html

總結(jié)

以上是生活随笔為你收集整理的hadoop配置2.6.1 centos7的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。