日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce

發(fā)布時間:2025/4/16 python 24 豆豆
生活随笔 收集整理的這篇文章主要介紹了 【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
場景:將Python程序通過hadoop-streaming提交到Hadoop集群執(zhí)行。
參考:http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/


1、Python編寫Mapper
? ?業(yè)務邏輯是從會從標準輸入(stdin)讀取數(shù)據(jù),默認以空格分割單詞,然后按行輸出單詞機器出現(xiàn)頻率到標準輸出(stdout),不過整個Map處理過程并不會統(tǒng)計每個單詞出現(xiàn)的總次數(shù),而是直接輸出“word,1”,以便作為Reduce的輸入進行統(tǒng)計。

? ?代碼如下:

#coding:utf-8''' Created on 2017年6月7日 @author: fjs '''#!/usr/bin/env python import sys# input comes from STDIN (standard input) for line in sys.stdin:# remove leading and trailing whitespaceline = line.strip()# split the line into wordswords = line.split()# increase countersfor word in words:# write the results to STDOUT (standard output);# what we output here will be the input for the# Reduce step, i.e. the input for reducer.py## tab-delimited; the trivial word count is 1print '%s\t%s' % (word, 1)2、Python編寫Reducer
? ?Reduce代碼,它會從標準輸入(stdin)讀取mapper.py的結(jié)果,然后統(tǒng)計每個單詞出現(xiàn)的總次數(shù)并輸出到標準輸出(stdout)。
? ?代碼如下:

#coding:utf-8''' Created on 2017年6月7日 @author: fjs '''#!/usr/bin/env pythonfrom operator import itemgetter import syscurrent_word = None current_count = 0 word = None# input comes from STDIN for line in sys.stdin:# remove leading and trailing whitespaceline = line.strip()# parse the input we got from mapper.pyword, count = line.split('\t', 1)# convert count (currently a string) to inttry:count = int(count)except ValueError:# count was not a number, so silently# ignore/discard this linecontinue# this IF-switch only works because Hadoop sorts map output# by key (here: word) before it is passed to the reducerif current_word == word:current_count += countelse:if current_word:# write result to STDOUTprint '%s\t%s' % (current_word, current_count)current_count = countcurrent_word = word# do not forget to output the last word if needed! if current_word == word:print '%s\t%s' % (current_word, current_count)
3、文件準備
? ?1)將python程序文件上傳到Hadoop集群客戶機,為文件賦予執(zhí)行權限
? ?#chmod +x /data/etlcj/python/mapper.py
? ?#chmod +x /data/etlcj/python/reducer.py
? ?2)上傳測試文件到集群
? ?#vi /data/etlcj/python/wcin.txt ? 加入:

foo foo quux labs foo bar quux abc bar see you by test welcome test abc labs foo me python hadoop ab ac bc bec python上傳到集群
? ?#hadoop fs -put /data/etlcj/python/wcin.txt ?/apps/etlcj/python/


4、基于hadoop-streaming執(zhí)行MapReduce任務:

? ? ?執(zhí)行語句:

#hadoop jar /usr/hdp/2.5.3.0-37/hadoop-mapreduce/hadoop-streaming-2.7.3.2.5.3.0-37.jar -files '/data/etlcj/python/mapper.py,/data/etlcj/python/reducer.py' -input /apps/etlcj/python/wcin.txt -output /apps/etlcj/python/out/ -mapper ./mapper.py -reducer ./reducer.py?執(zhí)行過程中提示:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 126at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)at java.security.AccessController.doPrivileged(Native Method)at javax.security.auth.Subject.doAs(Subject.java:415)at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
懷疑是py腳本代碼問題或版本環(huán)境不匹配問題,對python語法不熟悉,暫無法深入,但python提交到hadoop集群的方法可以。?


5、hadoop-streaming參數(shù)參考:
? Usage:hadoop jar $Haoop_Home$/hadoop-streaming-*.jar?
? ?-input <輸入目錄> \ # 可以指定多個輸入路徑,例如:-input '/user/foo/dir1' -input '/user/foo/dir2'
? ?-inputformat <輸入格式 JavaClassName>?
? ?-output <輸出目錄>?
? ?-outputformat <輸出格式 JavaClassName>?
? ?-mapper <mapper executable or JavaClassName>?
? ?-reducer <reducer executable or JavaClassName>?
? ?-combiner <combiner executable or JavaClassName>?
? ?-partitioner <JavaClassName> \
? ?-cmdenv <name=value> \ # 可以傳遞環(huán)境變量,可以當作參數(shù)傳入到任務中,可以配置多個
? ?-file <依賴的文件> \ # 配置文件,字典等依賴
? ?-D <name=value> \ # 作業(yè)的屬性配置


總結(jié)

以上是生活随笔為你收集整理的【Python学习系列四】Python程序通过hadoop-streaming提交到Hadoop集群执行MapReduce的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。