日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

使用Python实现Hadoop MapReduce程序

發布時間:2025/6/15 python 25 豆豆
生活随笔 收集整理的這篇文章主要介紹了 使用Python实现Hadoop MapReduce程序 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

根據上面兩篇文章,下面是我在自己的ubuntu上的運行過程。文字基本采用博文使用Python實現Hadoop MapReduce程序,? 打字很浪費時間滴。?

在這個實例中,我將會向大家介紹如何使用Python?為?Hadoop編寫一個簡單的MapReduce程序。

盡管Hadoop 框架是使用Java編寫的但是我們仍然需要使用像C++、Python等語言來實現 Hadoop程序。盡管Hadoop官方網站給的示例程序是使用Jython編寫并打包成Jar文件,這樣顯然造成了不便,其實,不一定非要這樣來實現,我們可以使用Python與Hadoop 關聯進行編程,看看位于/src/examples/python/WordCount.py ?的例子,你將了解到我在說什么。

我們想要做什么?

我們將編寫一個簡單的 MapReduce 程序,使用的是C-Python,而不是Jython編寫后打包成jar包的程序。
我們的這個例子將模仿 WordCount 并使用Python來實現,例子通過讀取文本文件來統計出單詞的出現次數。結果也以文本形式輸出,每一行包含一個單詞和單詞出現的次數,兩者中間使用制表符來想間隔。

先決條件

編寫這個程序之前,你學要架設好Hadoop 集群,這樣才能不會在后期工作抓瞎。如果你沒有架設好,那么在后面有個簡明教程來教你在Ubuntu Linux 上搭建(同樣適用于其他發行版linux、unix)

如何在Ubuntu Linux 上搭建hadoop的單節點模式和偽分布模式,請參閱博文 Ubuntu上搭建Hadoop環境(單機模式+偽分布模式)

Python的MapReduce代碼

使用Python編寫MapReduce代碼的技巧就在于我們使用了 HadoopStreaming 來幫助我們在Map 和 Reduce間傳遞數據通過STDIN (標準輸入)和STDOUT (標準輸出).我們僅僅使用Python的sys.stdin來輸入數據,使用sys.stdout輸出數據,這樣做是因為HadoopStreaming會幫我們辦好其他事。這是真的,別不相信!
Map: mapper.py

將下列的代碼保存在/usr/local/hadoop/mapper.py中,他將從STDIN讀取數據并將單詞成行分隔開,生成一個列表映射單詞與發生次數的關系:
注意:要確保這個腳本有足夠權限(chmod +x mapper.py)。

[python]?view plaincopy
  • #!/usr/bin/env?python??
  • ??
  • import?sys??
  • ??
  • #?input?comes?from?STDIN?(standard?input)??
  • for?line?in?sys.stdin:??
  • ????#?remove?leading?and?trailing?whitespace??
  • ????line?=?line.strip()??
  • ????#?split?the?line?into?words??
  • ????words?=?line.split()??
  • ????#?increase?counters??
  • ????for?word?in?words:??
  • ????????#?write?the?results?to?STDOUT?(standard?output);??
  • ????????#?what?we?output?here?will?be?the?input?for?the??
  • ????????#?Reduce?step,?i.e.?the?input?for?reducer.py??
  • ????????#??
  • ????????#?tab-delimited;?the?trivial?word?count?is?1??
  • ????????print?'%s\t%s'?%?(word,?1)??

  • 在這個腳本中,并不計算出單詞出現的總數,它將輸出 "<word> 1" 迅速地,盡管<word>可能會在輸入中出現多次,計算是留給后來的Reduce步驟(或叫做程序)來實現。當然你可以改變下編碼風格,完全尊重你的習慣。Reduce: reducer.py


    將代碼存儲在/usr/local/hadoop/reducer.py 中,這個腳本的作用是從mapper.py 的STDIN中讀取結果,然后計算每個單詞出現次數的總和,并輸出結果到STDOUT。

    同樣,要注意腳本權限:chmod +x reducer.py

    [python]?view plaincopy
  • #!/usr/bin/env?python??
  • ??
  • from?operator?import?itemgetter??
  • import?sys??
  • ??
  • current_word?=?None??
  • current_count?=?0??
  • word?=?None??
  • ??
  • #?input?comes?from?STDIN??
  • for?line?in?sys.stdin:??
  • ????#?remove?leading?and?trailing?whitespace??
  • ????line?=?line.strip()??
  • ??
  • ????#?parse?the?input?we?got?from?mapper.py??
  • ????word,?count?=?line.split('\t',?1)??
  • ??
  • ????#?convert?count?(currently?a?string)?to?int??
  • ????try:??
  • ????????count?=?int(count)??
  • ????except?ValueError:??
  • ????????#?count?was?not?a?number,?so?silently??
  • ????????#?ignore/discard?this?line??
  • ????????continue??
  • ??
  • ????#?this?IF-switch?only?works?because?Hadoop?sorts?map?output??
  • ????#?by?key?(here:?word)?before?it?is?passed?to?the?reducer??
  • ????if?current_word?==?word:??
  • ????????current_count?+=?count??
  • ????else:??
  • ????????if?current_word:??
  • ????????????#?write?result?to?STDOUT??
  • ????????????print?'%s\t%s'?%?(current_word,?current_count)??
  • ????????current_count?=?count??
  • ????????current_word?=?word??
  • ??
  • #?do?not?forget?to?output?the?last?word?if?needed!??
  • if?current_word?==?word:??
  • ????print?'%s\t%s'?%?(current_word,?current_count)??

  • 測試你的代碼(cat data | map | sort | reduce)

    我建議你在運行MapReduce job測試前嘗試手工測試你的mapper.py 和 reducer.py腳本,以免得不到任何返回結果

    這里有一些建議,關于如何測試你的Map和Reduce的功能:

    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$?echo?"foo?foo?quux?labs?foo?bar?quux"?|?./mapper.py??
  • foo??????1??
  • foo??????1??
  • quux?????1??
  • labs?????1??
  • foo??????1??
  • bar??????1??
  • quux?????1??
  • hadoop@derekUbun:/usr/local/hadoop$?echo?"foo?foo?quux?labs?foo?bar?quux"?|./mapper.py?|?sort?|./reducer.py??
  • bar?????1??
  • foo?????3??
  • labs????1??
  • quux????2??

  • # using one of the ebooks as example input
    # (see below on where to get the ebooks)

    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$?cat?book/book.txt?|./mapper.pysubscribe??????1??
  • to???1??
  • our??????1??
  • email????1??
  • newsletter???1??
  • to???1??
  • hear?????1??
  • about????1??
  • new??????1??
  • eBooks.??????1??

  • 在Hadoop平臺上運行Python腳本


    為了這個例子,我們將需要一本電子書,把它放在/usr/local/hadpoop/book/book.txt之下
    ? [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$?ls?-l?book??
  • 總用量?636??
  • -rw-rw-r--?1?derek?derek?649669??3月?12?12:22?book.txt??

  • 復制本地數據到HDFS

    在我們運行MapReduce job 前,我們需要將本地的文件復制到HDFS中:


    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-copyFromLocal?/usr/local/hadoop/book?book??
  • hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-ls??
  • Found?3?items??
  • drwxr-xr-x???-?hadoop?supergroup??????????0?2013-03-12?15:56?/user/hadoop/book??
  • 執行 MapReduce job現在,一切準備就緒,我們將在運行Python MapReduce job 在Hadoop集群上。像我上面所說的,我們使用的是HadoopStreaming 幫助我們傳遞數據在Map和Reduce間并通過STDIN和STDOUT,進行標準化輸入輸出。

    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar???
  • -mapper?/usr/local/hadoop/mapper.py???
  • -reducer?/usr/local/hadoop/reducer.py???
  • -input?book/*???
  • -output?book-output??
  • 在運行中,如果你想更改Hadoop的一些設置,如增加Reduce任務的數量,你可以使用“-jobconf”選項:

    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar???
  • -jobconf?mapred.reduce.tasks=4??
  • ??
  • -mapper?/usr/local/hadoop/mapper.py???
  • -reducer?/usr/local/hadoop/reducer.py???
  • -input?book/*???
  • -output?book-output???

  • 如果上面兩個運行出錯,請參考下面一段代碼。注意,重新運行,需要刪除dfs中的output文件

    [plain]?view plaincopy
  • bin/hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar????
  • -mapper?task1/mapper.py????
  • -file?task1/mapper.py????
  • -reducer?task1/reducer.py????
  • -file?task1/reducer.py????
  • -input?url???
  • -output?url-output????
  • -jobconf?mapred.reduce.tasks=3???

  • 一個重要的備忘是關于Hadoop does not honor mapred.map.tasks 這個任務將會讀取HDFS目錄下的book并處理他們,將結果存儲在獨立的結果文件中,并存儲在HDFS目錄下的book-output目錄。之前執行的結果如下:

    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$?hadoop?jar?contrib/streaming/hadoop-streaming-1.1.2.jar?-jobconf?mapred.reduce.tasks=4?-mapper?/usr/local/hadoop/mapper.py?-reducer?/usr/local/hadoop/reducer.py?-input?book/*?-output?book-output??
  • 13/03/12?16:01:05?WARN?streaming.StreamJob:?-jobconf?option?is?deprecated,?please?use?-D?instead.??
  • packageJobJar:?[/usr/local/hadoop/tmp/hadoop-unjar4835873410426602498/]?[]?/tmp/streamjob5047485520312501206.jar?tmpDir=null??
  • 13/03/12?16:01:06?INFO?util.NativeCodeLoader:?Loaded?the?native-hadoop?library??
  • 13/03/12?16:01:06?WARN?snappy.LoadSnappy:?Snappy?native?library?not?loaded??
  • 13/03/12?16:01:06?INFO?mapred.FileInputFormat:?Total?input?paths?to?process?:?1??
  • 13/03/12?16:01:06?INFO?streaming.StreamJob:?getLocalDirs():?[/usr/local/hadoop/tmp/mapred/local]??
  • 13/03/12?16:01:06?INFO?streaming.StreamJob:?Running?job:?job_201303121448_0010??
  • 13/03/12?16:01:06?INFO?streaming.StreamJob:?To?kill?this?job,?run:??
  • 13/03/12?16:01:06?INFO?streaming.StreamJob:?/usr/local/hadoop/libexec/../bin/hadoop?job??-Dmapred.job.tracker=localhost:9001?-kill?job_201303121448_0010??
  • 13/03/12?16:01:06?INFO?streaming.StreamJob:?Tracking?URL:?http://localhost:50030/jobdetails.jsp?jobid=job_201303121448_0010??
  • 13/03/12?16:01:07?INFO?streaming.StreamJob:??map?0%??reduce?0%??
  • 13/03/12?16:01:10?INFO?streaming.StreamJob:??map?100%??reduce?0%??
  • 13/03/12?16:01:17?INFO?streaming.StreamJob:??map?100%??reduce?8%??
  • 13/03/12?16:01:18?INFO?streaming.StreamJob:??map?100%??reduce?33%??
  • 13/03/12?16:01:19?INFO?streaming.StreamJob:??map?100%??reduce?50%??
  • 13/03/12?16:01:26?INFO?streaming.StreamJob:??map?100%??reduce?67%??
  • 13/03/12?16:01:27?INFO?streaming.StreamJob:??map?100%??reduce?83%??
  • 13/03/12?16:01:28?INFO?streaming.StreamJob:??map?100%??reduce?100%??
  • 13/03/12?16:01:29?INFO?streaming.StreamJob:?Job?complete:?job_201303121448_0010??
  • 13/03/12?16:01:29?INFO?streaming.StreamJob:?Output:?book-output??
  • hadoop@derekUbun:/usr/local/hadoop$??
  • 如你所見到的上面的輸出結果,Hadoop 同時還提供了一個基本的WEB接口顯示統計結果和信息。
    當Hadoop集群在執行時,你可以使用瀏覽器訪問 http://localhost:50030/ :


    檢查結果是否輸出并存儲在HDFS目錄下的book-output中:

    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-ls?book-output??
  • Found?6?items??
  • -rw-r--r--???2?hadoop?supergroup??????????0?2013-03-12?16:01?/user/hadoop/book-output/_SUCCESS??
  • drwxr-xr-x???-?hadoop?supergroup??????????0?2013-03-12?16:01?/user/hadoop/book-output/_logs??
  • -rw-r--r--???2?hadoop?supergroup?????????33?2013-03-12?16:01?/user/hadoop/book-output/part-00000??
  • -rw-r--r--???2?hadoop?supergroup?????????60?2013-03-12?16:01?/user/hadoop/book-output/part-00001??
  • -rw-r--r--???2?hadoop?supergroup?????????54?2013-03-12?16:01?/user/hadoop/book-output/part-00002??
  • -rw-r--r--???2?hadoop?supergroup?????????47?2013-03-12?16:01?/user/hadoop/book-output/part-00003??
  • hadoop@derekUbun:/usr/local/hadoop$??

  • 可以使用dfs -cat 命令檢查文件目錄


    [plain]?view plaincopy
  • hadoop@derekUbun:/usr/local/hadoop$?hadoop?dfs?-cat?book-output/part-00000??
  • about???1??
  • eBooks.?????1??
  • the?????1??
  • to??2??
  • hadoop@derekUbun:/usr/local/hadoop$???
  • 下面是原英文作者mapper.py和reducer.py的兩個修改版本:

    mapper.py

    [python]?view plaincopy
  • #!/usr/bin/env?python??
  • """A?more?advanced?Mapper,?using?Python?iterators?and?generators."""??
  • ??
  • import?sys??
  • ??
  • def?read_input(file):??
  • ????for?line?in?file:??
  • ????????#?split?the?line?into?words??
  • ????????yield?line.split()??
  • ??
  • def?main(separator='\t'):??
  • ????#?input?comes?from?STDIN?(standard?input)??
  • ????data?=?read_input(sys.stdin)??
  • ????for?words?in?data:??
  • ????????#?write?the?results?to?STDOUT?(standard?output);??
  • ????????#?what?we?output?here?will?be?the?input?for?the??
  • ????????#?Reduce?step,?i.e.?the?input?for?reducer.py??
  • ????????#??
  • ????????#?tab-delimited;?the?trivial?word?count?is?1??
  • ????????for?word?in?words:??
  • ????????????print?'%s%s%d'?%?(word,?separator,?1)??
  • ??
  • if?__name__?==?"__main__":??
  • ????main()??

  • reducer.py

    [python]?view plaincopy
  • #!/usr/bin/env?python??
  • """A?more?advanced?Reducer,?using?Python?iterators?and?generators."""??
  • ??
  • from?itertools?import?groupby??
  • from?operator?import?itemgetter??
  • import?sys??
  • ??
  • def?read_mapper_output(file,?separator='\t'):??
  • ????for?line?in?file:??
  • ????????yield?line.rstrip().split(separator,?1)??
  • ??
  • def?main(separator='\t'):??
  • ????#?input?comes?from?STDIN?(standard?input)??
  • ????data?=?read_mapper_output(sys.stdin,?separator=separator)??
  • ????#?groupby?groups?multiple?word-count?pairs?by?word,??
  • ????#?and?creates?an?iterator?that?returns?consecutive?keys?and?their?group:??
  • ????#???current_word?-?string?containing?a?word?(the?key)??
  • ????#???group?-?iterator?yielding?all?["<current_word>",?"<count>"]?items??
  • ????for?current_word,?group?in?groupby(data,?itemgetter(0)):??
  • ????????try:??
  • ????????????total_count?=?sum(int(count)?for?current_word,?count?in?group)??
  • ????????????print?"%s%s%d"?%?(current_word,?separator,?total_count)??
  • ????????except?ValueError:??
  • ????????????#?count?was?not?a?number,?so?silently?discard?this?item??
  • ????????????pass??
  • ??
  • if?__name__?==?"__main__":??
  • ????main()??
  • 總結

    以上是生活随笔為你收集整理的使用Python实现Hadoop MapReduce程序的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。