日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce

發布時間:2025/3/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

?

1.數據存儲與分析

問題:當磁盤的存儲量隨著時間的推移越來越大的時候,對磁盤上的數據的讀取速度卻沒有多大的增長

從多個磁盤上進行并行讀寫操作是可行的,但是存在以下幾個方面的問題:

1).第一個問題是硬件錯誤。使用的硬件越多出錯的幾率就越大。一種常用的解決方式是數據冗余,保留多分拷貝,即使一份數據處理出錯,還有另外的數據。HDFS使用的也是類似的方式,但稍有不同。

2).第二個問題是數據處理的相關性問題。例如很多分析工作在一快磁盤上處理出來的結果需要與其他磁盤上處理處理出來的結果合并才能完成任務。各種分布式系統也都給出了合并的策略,但是做好這方面確實是一個挑戰。MapReduce提供了一種編程模型,他將從硬盤上讀寫數據的問題抽象出來,轉化成對一系列鍵值對的計算

簡而言之,Hadoop提供了一個可靠的存儲和分析系統。存儲又HDFS提供,分析由MapReduce提供。

?

2.與其他系統比較

1).RDBMS

為什么需要MapReduce

a.磁盤的尋道時間提高的速度低于數據的傳輸速度,如果數據訪問模式由尋道時間支配的話,在讀寫數據集的一大部分的時候速度就會較流式讀取慢很多,這樣就出現了瓶頸。

b.另一方面在更新數據集的少量數據的時候,傳統的B-樹工作的比較好,但是在更新數據集的大部分數據的時候B-樹就顯得比MapReduce方式慢了。MapReduce使用排序/合并操作去重建數據庫(完成數據更新).

c.MapReduce比較適合于需要分析整個數據集,并且要使用批處理方式,特別是特定的分析的情況;RDBMS點查詢方面占優勢,或在已編制索引的數據集提供低延遲的檢索和更新的數據,但是數據量不能太大。MapReduce適合一次寫入,多次讀取的操作,但是關系數據庫就比較適合對數據集的持續更新。

d.MapReduce比較適合處理半結構化,非結構化的數據

e.MapReduce是可以進行線性擴展的編程模型。一個對集群級別的數據量而寫的MapReduce可以不加修改的應用于小數據量或者更大數據量的處理上。更重要的是當你的輸入數據增長一倍的時候,相應的處理時間也會增加一倍。但是如果你把集群也增長一倍的話,處理的速度則會和沒有增加數據量時候的速度一樣快,這方面對SQL查詢來說不見得是正確的。

f.關系數據往往進行規則化以保證數據完整性,并刪除冗余。這樣做給MapReduce提出了新的問題:它使得讀數據變成了非本地執行,而MapReduce的一個重要前提(假設)就是數據可以進行高速的流式讀寫

?

2).Grid Compuing 網格計算

a.MapReduce使數據和計算在一個節點上完成,這樣就變成了本地的讀取。這是MapReduce高性能的核心.

b.MPI將控制權大大的交給了程序員,但是這就要求程序員明確的處理數據流等情況,而MapReduce只提供高層次的操作:程序員只需考慮處理鍵值對的函數,而對數據流則是比較隱晦的。

c.MapReduce是一種非共享(Shared-nothing)的架構,當MapReduce實現檢測到map或者reduce過程出錯的時候,他可以將錯誤的部分再執行一次MPI程序員則需要明確的考慮檢查點和恢復,這雖然給程序員很大自由,但是也使得程序變得難寫。

?

3).志愿計算

MapReduce是針對在一個高聚合網絡連接的數據中心中進行的可信的、使用專用的硬件工作持續數分鐘或者數個小時而設計的。相比之下,志愿計算則是在不可信的、鏈接速度有很大差異的、沒有數據本地化特性的,互聯網上的計算機上運行永久的(超長時間的)計算,

?

3.天氣數據集

數據是NCDC的數據,我們關注以下特點:

1)? 數據是半格式化的

2)? 目錄里面存放的是從1901-2001年一個世紀的記錄,是gzip壓縮過的文件。

3)? 以行為單位,使用ASCII格式存儲,每行就是一條記錄

4)? 每條記錄我們關注一些基本的元素,比如溫度,這些數據在每條數據中都會出現,并且寬度也是固定的。

下面是一條記錄的格式,為了便于顯示,做了一部分調整。

?

?

?

4.使用Unix工具分析數據

以分析某年份的最高溫度為例,下面是一段Unix的腳本程序:

#!/usr/bin/env bash for year in all/* doecho -ne `basename $year .gz`"\t"gunzip -c $year | \awk '{ temp = substr($0, 88, 5) + 0;q = substr($0, 93, 1);if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }END { print max }' done

這段腳本的執行過程如下:

腳本循環處理每一個壓縮的年份文件,首先打印出年份,然后對每一個文件使用awk處理。Awk腳本從數據中解析出兩個字段:一個air temperature,一個quality code。air temperature值加0被轉換成整形。接下來查看溫度數據是否有效(9999表示在NCDC數據集中丟失的值),并且檢查quality code是不是可信并沒有錯誤的。如果讀取一切正常,temp將與目前的最大值比較,如果出現新的最大值,則更新當前max的值。當文件中所有行的數據都被處理之后,開始執行End程序塊,并且打印出最大值。

下面是某次運行結果的起始部分:

為了加速處理速度,我們將程序的某些部分進行并行執行。這在理論上是比較簡單的,我們可以按照年份來在不同的處理器上執行,使用所有可用的硬件線程,但是還是有些問題:

1).把任務切分成相同大小的塊不總是那么容易的。

2).合并單獨處理出來的結果還需要進一步的處理

3).人們仍舊被單機的處理能力所束縛。

?

5.?使用Hadoop分析數據

1).Map和Reduce

MapReduce將工作分為map階段和reduce階段,每個階段都將鍵值對作為輸入輸入,鍵值對的類型可以由程序員選擇。程序員還指定兩個函數:map和reduce函數。

Map階段的輸入數據是NCDC的原始數據,我們選擇文本格式輸入,這樣可以把記錄中的每一行作為文本value。Key是當前行離開始行的偏移量,但是我們并不需要這個信息,所以忽略不要。

我們的Map函數比較簡單,僅僅從輸入中析取出temperature。其中,map函數僅僅是完成了數據的準備階段,這樣使得reducer函數可以基于它查找歷年的最高溫度。Map函數也是一個很好的過濾階段,這里可以過濾掉丟失、置疑、錯誤的temperature數據。

下面是輸入數據樣例:

下面這些行以鍵值對的方式來給map函數處理,其中加粗的是我們需要處理的數據

上面的鍵(key)是文件中的行偏移量,map函數不需要,這里的map函數的功能僅限于提取年份和氣溫,并將他們作為輸出:

map函數輸出經由mapreduce框架中進行進一步的處理后,主要需要根據鍵對鍵/值對進行排序和分組。經過這一番處理之后,Reduce收來的結果如下:

處理這些數據,reduce所需要做的工作僅僅是遍歷這些數據,找出最大值,產生最終的輸出結果:

MapReduce的邏輯數據流:

?

?

2).Java MapReduce 程序

這里需要三塊代碼:Map函數、Reduce函數、用來運行作業的main函數

Map函數的實現

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class MaxTemperatureMapper extendsMapper<LongWritable, Text, Text, IntWritable> {private static final int MISSING = 9999;@Overridepublic void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String year = line.substring(15, 19);int airTemperature;if (line.charAt(87) == '+') { // parseInt doesn't like leading plus// signsairTemperature = Integer.parseInt(line.substring(88, 92));} else {airTemperature = Integer.parseInt(line.substring(87, 92));}String quality = line.substring(92, 93);if (airTemperature != MISSING && quality.matches("[01459]")) {context.write(new Text(year), new IntWritable(airTemperature));}} }

Hadoop提供了他自己的基本類型,這些類型為網絡序列化做了專門的優化。可以在org.apache.hadoop.io包中找到他們。比如LongWritable相當于Java中的LongText相當于StringIntWritable在相當于Integermap()方法傳入一個key和一個value。我們將Text類型的value轉化成JavaString,然后用Stringsubstring方法取出我偶們需要的部分,最后利用context.write按照鍵/值的形式收集數據。

?

Reduce函數的實現

?

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;public class MaxTemperatureReducer extendsReducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int maxValue = Integer.MIN_VALUE;for (IntWritable value : values) {maxValue = Math.max(maxValue, value.get());}context.write(key, new IntWritable(maxValue));} }

Reduce的輸入類型必須是:Text,IntWritable類型。Reduce在本例輸出結果是Text和IntWritbale類型,year和與其對應的maxValue是經過遍歷、比較之后得到的。

?

負責運行MapReduce作業的main函數

import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MaxTemperature {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperature <input path> <output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperature.class);job.setJobName("Max temperature");
FileInputFormat.addInputPath(job,
new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.
class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(
true) ? 0 : 1);} }

?


6.數據流

1).MapReduce作業的工作單元由如下組成:輸入數據、MapReduce程序、配置信息

2).Hadoop將Task分成兩類:MapTask、ReduceTask.

3).為了控制Hadoop的Job運行,Hadoop有兩累節點:一種是jobtracker,一種是tasktrackerJobtracker通過調度tasktracker協調所有工作的執行。Tasktracker運行任務并將報告發送給jobtrackerjobtracker所有工作的進度。如果一個任務失敗,jobtracker再重新調度一個不同的tasktracker進行工作。但是在Hadoop 0.23和2.0版本ongoing他們已經被Resource Manager、Node Manager和ApplicationMaster所替代,相應的資源并被Container所封裝

4).Hadoop將MapReduce的輸入數據分成固定大小的分片,成為數據分片(input split),然后為每個分片創建一個MapTask (分片默認64M大小)

5).Hadoop可以通過在存儲有輸入的數據節點上運行相應Map任務,提高性能(數據本地優化),另外Map任務將其樹立后的數據寫到本地磁盤,減少Hadoop分布式Node之間的數據傳輸壓力。相比之下任務沒有數據本地優化的優勢-----單個Reduce任務的輸入通常來自所有的map輸出

下圖展現了Data-local(數據本地),Rack-local與Off-local Map任務的區別:

下圖給出MapReduce的集中執行方式數據流的情況:

?一個Reduce的情況:

兩個Reduce的情況:

無Reduce情況:

?

?

?

?

?

?

7.Combiner函數

為了優化集群數據傳輸,減少Map任務和Reduce任務之間的數據傳輸,Hadop針對Map任務指定一個Combiner函數(合并函數)--對本地鍵(key)相同的做合并處理,實際上Combiner函數很像Reduce函數,只不過運行在本地,最后Combiner函數的輸出作為作為Reduce的函數的輸入,Hadoop執行函數的順序就變為Map--->Combiner--->Reduce

示例如下:

第一個Map的輸出:

第二個Map的輸出:

Reduce函數被調用時:

最后輸出結果:

?

指定一個Combiner函數,這里用Reduce函數作為Combiner函數

?

public class MaxTemperatureWithCombiner {public static void main(String[] args) throws Exception {if (args.length != 2) {System.err.println("Usage: MaxTemperatureWithCombiner <input path> "+ "<output path>");System.exit(-1);}Job job = new Job();job.setJarByClass(MaxTemperatureWithCombiner.class);job.setJobName("Max temperature");
FileInputFormat.addInputPath(job,
new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.
class);job.setCombinerClass(MaxTemperatureReducer.class);job.setReducerClass(MaxTemperatureReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(
true) ? 0 : 1);} }

?

8.Hadoop的Streaming

Streaming天生適合用于文本處理,在文本模式下使用時,它有一個數據的行視圖,map的輸入數據通過標準輸入流傳遞給map函數,并且一行一行的傳輸,最后將結果行寫到標準輸出。

map輸出的鍵/值對是以一個制表符分隔的行,它以這樣的形式寫到標準輸出,reduce函數的輸入格式相同----通過制表符來分隔的鍵/值對----并通過標準輸入流進行傳輸

1)ruby版本

2)Python版本

?

9.Hadoop的Pipes

?

?

?

?

?

?

?

?

轉載于:https://www.cnblogs.com/biyeymyhjob/archive/2012/08/08/2628265.html

總結

以上是生活随笔為你收集整理的Hadoop:The Definitive Guid 总结 Chapter 1~2 初识Hadoop、MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。