日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式离线计算—MapReduce—基本原理

發(fā)布時間:2024/4/15 编程问答 33 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式离线计算—MapReduce—基本原理 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

原文作者:黎先生

原文地址:MapReduce基本原理及應用

目錄

一、MapReduce模型簡介

1. Map和Reduce函數(shù)

2. MapReduce體系結構

3. MapReduce工作流程

4. MapReduce應用程序執(zhí)行過程

二 、WordCount運行實例

1.?WordCount的Map過程

2. WordCount的Reduce過程

3. WordCount源碼


一、MapReduce模型簡介

  MapReduce將復雜的、運行于大規(guī)模集群上的并行計算過程高度地抽象到了兩個函數(shù):Map和Reduce。它采用“分而治之”策略,一個存儲在分布式文件系統(tǒng)中的大規(guī)模數(shù)據(jù)集,會被切分成許多獨立的分片(split),這些分片可以被多個Map任務并行處理

1. Map和Reduce函數(shù)

Map和Reduce

2. MapReduce體系結構

  MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task

  1)Client

  用戶編寫的MapReduce程序通過Client提交到JobTracker端 用戶可通過Client提供的一些接口查看作業(yè)運行狀態(tài)

  2)JobTracker

  JobTracker負責資源監(jiān)控和作業(yè)調(diào)度 JobTracker 監(jiān)控所有TaskTracker與Job的健康狀況,一旦發(fā)現(xiàn)失敗,就將相應的任務轉移到其他節(jié)點 JobTracker 會跟蹤任務的執(zhí)行進度、資源使用量等信息,并將這些信息告訴任務調(diào)度器(TaskScheduler),而調(diào)度器會在資源出現(xiàn)空閑時,

  選擇合適的任務去使用這些資源

  3)TaskTracker

  TaskTracker 會周期性地通過“心跳”將本節(jié)點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發(fā)送過來的命令并執(zhí)行相應的操作(如啟動新任務、殺死任務等) TaskTracker 使用“slot”等量劃分本節(jié)點上的資源量(CPU、內(nèi)存等)。一個Task 獲取到

  一個slot 后才有機會運行,而Hadoop調(diào)度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用

  4)Task

  Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動

3. MapReduce工作流程

  1) 工作流程概述

 

  • 不同的Map任務之間不會進行通信
  • 不同的Reduce任務之間也不會發(fā)生任何信息交換
  • 用戶不能顯式地從一臺機器向另一臺機器發(fā)送消息
  • 所有的數(shù)據(jù)交換都是通過MapReduce框架自身去實現(xiàn)的

  2) MapReduce各個執(zhí)行階段

 

4. MapReduce應用程序執(zhí)行過程

 

二 、WordCount運行實例

工作流程是Input從HDFS里面并行讀取文本中的內(nèi)容,經(jīng)過MapReduce模型,最終把分析出來的結果用Output封裝,持久化到HDFS中

1.?WordCount的Map過程

使用三個Map任務并行讀取三行文件中的內(nèi)容,對讀取的單詞進行map操作,每個單詞都以<key, value>形式生成

  

Map端源碼:

public class WordMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase()); context.write(word, one); } } }

?

2. WordCount的Reduce過程

Reduce操作是對Map的結果進行排序、合并等操作最后得出詞頻

 

Reduce端源碼

public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, new IntWritable(sum)); } }

3. WordCount源碼

import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class WordMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer itr = new StringTokenizer(line); while (itr.hasMoreTokens()) { word.set(itr.nextToken().toLowerCase()); context.write(word, one); } } } public static class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(WordMapper.class); job.setCombinerClass(WordReducer.class); job.setReducerClass(WordReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

總結

以上是生活随笔為你收集整理的分布式离线计算—MapReduce—基本原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。