日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce DataJoin 链接多数据源

發布時間:2025/7/14 编程问答 21 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce DataJoin 链接多数据源 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
  • 主要介紹用DataJoin類來鏈接多數據源,先看一下例子,假設二個數據源customs和orders

    customer ID?????? Name????? PhomeNumber

    1??????????????????????? 趙一??????? 025-5455-566

    2??????????????????????? 錢二??????? 025-4587-565

    3??????????????????????? 孫三??????? 021-5845-5875

    客戶的訂單號:

    Customer ID???? order ID???? Price??? Data

    2????????????????????????? 1?????????????? 93?????? 2008-01-08

    3????????????????????????? 2?????????????? 43?????? 2012-01-21

    1????????????????????????? 3?????????????? 43?????? 2012-05-12

    2????????????????????????? 4?????????????? 32?????? 2012-5-14

    問題:現在要生成訂單

    customer ID??? name??? PhomeNumber???? Price???? Data

    2????????????????????? 錢二???? 025-4587-565??????? 93????????? 2008-01-08

    上面是一個例子,下面介紹一下hadoop中DataJoin類具體的做法。

    首先,需要為不同數據源下的每個數據定義一個數據標簽,這一點不難理解,就是標記數據的出處。

    其次,需要為每個待鏈接的數據記錄確定一個鏈接主鍵,這一點不難理解。DataJoin類庫分別在map階段和Reduce階段提供一個處理框架,盡可能幫助程序員完成一些處理的工作,僅僅留下一些必須工作,由程序完成。

    Map階段
    DataJoin類庫里有一個抽象基類DataJoinMapperBase,該基類實現了map方法,該方法為對每個數據源下的文本的記錄生成一個帶表見的數據記錄對象。但是程序必須指定它是來自于哪個數據源,即Tag,還要指定它的主鍵是什么即GroupKey。如果指定了Tag和GroupKey,那么map將會生成一下的記錄,customer表為例

    customers???????? 1??????????????? 趙一??????? 025-5455-566;?????? customers???????? 2??????????????? 錢二??????? 025-4587-565;

    Map過程中Tag和GroupKey都是程序員給定,所以要肯定要就有接口供程序員去實現,DataJoinMapperBase實現下面3個接口。

    abstract Text gernerateInputTag(String inuptFile), 看方法名就知道是設置Tag。

    abstract Text generateGroupKey(TaggedMapOutput lineRecord), 該方法是設置GroupKey,其中,lineRecord是數據源中的一行數據,該方法可以在這一行數據上設置任意的GroupKey為主鍵。

    abstract TaggedMapOutput generateMapOutput(object value), 該抽象方法用于把數據源中的原始數據記錄包裝成一個帶標簽的數據源。TaggedMapOutputs是一行記錄的數據類型。代碼如下:

    ?

    view sourceprint? 01.import?org.apache.hadoop.contrib.utils.join.*; 02.import?org.apache.hadoop.contrib.utils.join.TaggedMapOutput; 03.import?org.apache.hadoop.io.Text; 04.? 05.public?class?MapClass?extends?DataJoinMapperBase{ 06.? 07.@Override 08.protected?Text generateGroupKey(TaggedMapOutput arg0) { 09.String line = ((Text)arg0.getData()).toString(); 10.String[] tokens = line.split(","); 11.String groupKey = tokens[0]; 12.return?new?Text(groupKey); 13.} 14.? 15.@Override 16.protected?Text generateInputTag(String arg0) { 17.? 18.String dataSource = arg0.split("-")[0]; 19.return?new?Text(dataSource); 20.} 21.? 22.@Override 23.protected?TaggedMapOutput generateTaggedMapOutput(Object arg0) { 24.TaggedWritable tw =?new?TaggedWritable((Text)arg0); 25.tw.setTag(this.inputTag); 26.return?tw; 27.} 28.}
    view sourceprint? 01.import?java.io.DataInput; 02.import?java.io.DataOutput; 03.import?java.io.IOException; 04.import?org.apache.hadoop.contrib.utils.join.TaggedMapOutput; 05.import?org.apache.hadoop.io.Text; 06.import?org.apache.hadoop.io.Writable; 07.? 08.public?class?TaggedWritable?extends?TaggedMapOutput{ 09.? 10.private?Writable data; 11.public?TaggedWritable(Writable data) {? 12.this.tag =?new?Text("");? 13.this.data = data;? 14.} 15.? 16.@Override 17.public?Writable getData() { 18.return?data; 19.} 20.? 21.@Override 22.public?void?readFields(DataInput arg0)?throws?IOException { 23.this.tag.readFields(arg0); 24.this.data.readFields(arg0); 25.} 26.? 27.@Override 28.public?void?write(DataOutput arg0)?throws?IOException { 29.this.tag.write(arg0); 30.this.data.write(arg0);??? 31.} 32.}

    每個記錄的數據源標簽可以由generateInputTag()產生,通過setTag()方法設置記錄的Tag。

    note:1.該記錄不是關系數據庫,是文本文件,2.?TaggedMapOutput在import org.apache.hadoop.contrib.utils.join.*頭文件中,有的時候在eclipse下,每個這個頭文件,這時 ? 只要找到你的hadoop的目錄下contrib/datajoin文件加,把jar文件導入eclipse中即可。

    Reduce 階段

    DataJoinReduceBase中已經實現reduce()方法,具有同一GroupKey的數據分到同一Reduce中,通過reduce的方法將對來自不同的數據源和據用相同的GroupKey做一次叉積組合。這個比較難懂,舉個例子:


    customers???????? 2??????????????? 錢二??????? 025-4587-565;

    orders????? 2??????????????? 1?????????????? 93?????? 2008-01-08;?

    orders 2?????????? 4?????????????? 32?????? 2012-5-14
    ?


    按照map()結果的數據,就是下表給出的結果(3個記錄),他們都有一個共同的GroupKey,帶來自于二個數據源,所以叉積的結果為


    customers???????? 2??????????????? 錢二??????? 025-4587-565

    orders????? 2??????????????? 1?????????????? 93?????? 2008-01-08
    ?customers???????? 2??????????????? 錢二??????? 025-4587-565

    orders 2?????????? 4?????????????? 32?????? 2012-5-14
    ?

    如果Reduce階段看懂了,基本上這個就搞定了,Reduce是系統做的,不需要用戶重載,接下來的工作就是要實現一個combine()函數,它的作用是將每個叉積合并起來,形成訂單的格式。

    代碼如下:

    view sourceprint? 01.import?org.apache.hadoop.conf.Configuration; 02.import?org.apache.hadoop.contrib.utils.join.DataJoinReducerBase; 03.import?org.apache.hadoop.contrib.utils.join.TaggedMapOutput; 04.import?org.apache.hadoop.fs.Path; 05.import?org.apache.hadoop.io.Text; 06.import?org.apache.hadoop.mapred.JobClient; 07.import?org.apache.hadoop.mapred.JobConf; 08.import?org.apache.hadoop.mapred.jobcontrol.Job; 09.import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 10.import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 11.? 12.public?class?ReduceClass?extends?DataJoinReducerBase{ 13.? 14.@Override 15.protected?TaggedMapOutput combine(Object[] tags, Object[] values) { 16.if(tags.length<2)return?null; 17.StringBuffer joinData =?new?StringBuffer(); 18.int?count=0; 19.? 20.for(Object value: values){ 21.joinData.append(","); 22.TaggedWritable tw = (TaggedWritable)value; 23.String recordLine = ((Text)tw.getData()).toString(); 24.String[] tokens = recordLine.split(",",2); 25.if(count==0) joinData.append(tokens[0]); 26.joinData.append(tokens[1]); 27.} 28.? 29.TaggedWritable rtv =?new?TaggedWritable(new?Text(new?String(joinData))); 30.rtv.setTag((Text)tags[0]); 31.return?rtv; 32.} 33.? 34.public?static?void?main(String[] args){ 35.? 36.Configuration conf =?new?Configuration();?? 37.JobConf job =?new?JobConf(conf, ReduceClass.class);? 38.? 39.Path in =?new?Path(args[0]);? 40.Path out =?new?Path(args[1]);? 41.FileInputFormat.setInputPaths(job, in);? 42.FileOutputFormat.setOutputPath(job, out);? 43.job.setJobName("DataJoin");? 44.job.setMapperClass(MapClass.class);? 45.job.setReducerClass(ReduceClass.class);? 46.? 47.job.setInputFormat(TextInputFormat.class);? 48.job.setOutputFormat(TextOutputFormat.class);? 49.job.setOutputKeyClass(Text.class);? 50.job.setOutputValueClass(TaggedWritable.class);? 51.job.set("mapred.textoutputformat.separator",?",");? 52.JobClient.runJob(job); 53.? 54.} 55.}

總結

以上是生活随笔為你收集整理的MapReduce DataJoin 链接多数据源的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。