日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce 中的两表 join 几种方案简介

發(fā)布時間:2023/12/10 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce 中的两表 join 几种方案简介 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

1. 概述

在傳統(tǒng)數(shù)據(jù)庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進(jìn)行JOIN操作,同樣常見且耗時,由于Hadoop的獨(dú)特設(shè)計思想,當(dāng)進(jìn)行JOIN操作時,有一些特殊的技巧。

本文首先介紹了Hadoop上通常的JOIN實現(xiàn)方法,然后給出了幾種針對不同輸入數(shù)據(jù)集的優(yōu)化方法。

2. 常見的join方法介紹

假設(shè)要進(jìn)行join的數(shù)據(jù)分別來自File1和File2.

2.1 reduce side join

reduce side join是一種最簡單的join方式,其主要思想如下:

在map階段,map函數(shù)同時讀取兩個文件File1和File2,為了區(qū)分兩種來源的key/value數(shù)據(jù)對,對每條數(shù)據(jù)打一個標(biāo)簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務(wù)是對不同文件中的數(shù)據(jù)打標(biāo)簽。

在reduce階段,reduce函數(shù)獲取key相同的來自File1和File2文件的value list, 然后對于同一個key,對File1和File2中的數(shù)據(jù)進(jìn)行join(笛卡爾乘積)。即:reduce階段進(jìn)行實際的連接操作。

REF:hadoop join之reduce side join

http://blog.csdn.net/huashetianzu/article/details/7819244

2.2 map side join

之所以存在reduce side join,是因為在map階段不能獲取所有需要的join字段,即:同一個key對應(yīng)的字段可能位于不同map中。Reduce side join是非常低效的,因為shuffle階段要進(jìn)行大量的數(shù)據(jù)傳輸。

Map side join是針對以下場景進(jìn)行的優(yōu)化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至于小表可以直接存放到內(nèi)存中。這樣,我們可以將小表復(fù)制多份,讓每個map task內(nèi)存中存在一份(比如存放到hash table中),然后只掃描大表:對于大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接后輸出即可。

為了支持文件的復(fù)制,Hadoop提供了一個類DistributedCache,使用該類的方法如下:

(1)用戶使用靜態(tài)方法DistributedCache.addCacheFile()指定要復(fù)制的文件,它的參數(shù)是文件的URI(如果是HDFS上的文件,可以這樣:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口號)。JobTracker在作業(yè)啟動之前會獲取這個URI列表,并將相應(yīng)的文件拷貝到各個TaskTracker的本地磁盤上。(2)用戶使用DistributedCache.getLocalCacheFiles()方法獲取文件目錄,并使用標(biāo)準(zhǔn)的文件讀寫API讀取相應(yīng)的文件。

REF:hadoop join之map side join

http://blog.csdn.net/huashetianzu/article/details/7821674

2.3 Semi Join

Semi Join,也叫半連接,是從分布式數(shù)據(jù)庫中借鑒過來的方法。它的產(chǎn)生動機(jī)是:對于reduce side join,跨機(jī)器的數(shù)據(jù)傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的數(shù)據(jù),則可以大大節(jié)省網(wǎng)絡(luò)IO。

實現(xiàn)方法很簡單:選取一個小表,假設(shè)是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件一般很小,可以放到內(nèi)存中。在map階段,使用DistributedCache將File3復(fù)制到各個TaskTracker上,然后將File2中不在File3中的key對應(yīng)的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同。

更多關(guān)于半連接的介紹,可參考:半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

REF:hadoop join之semi join

http://blog.csdn.net/huashetianzu/article/details/7823326

2.4 reduce side join + BloomFilter

在某些情況下,SemiJoin抽取出來的小表的key集合在內(nèi)存中仍然存放不下,這時候可以使用BloomFiler以節(jié)省空間。

BloomFilter最常見的作用是:判斷某個元素是否在一個集合里面。它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在 false negative,即:如果contains()返回false,則該元素一定不在集合中,但會存在一定的?false positive,即:如果contains()返回true,則該元素一定可能在集合中。

因而可將小表中的key保存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關(guān)系,只不過增加了少量的網(wǎng)絡(luò)IO而已。

更多關(guān)于BloomFilter的介紹,可參考:http://blog.csdn.net/jiaomeng/article/details/1495500

3. 二次排序

在Hadoop中,默認(rèn)情況下是按照key進(jìn)行排序,如果要按照value進(jìn)行排序怎么辦?即:對于同一個key,reduce函數(shù)接收到的value list是按照value排序的。這種應(yīng)用需求在join操作中很常見,比如,希望相同的key中,小表對應(yīng)的value排在前面。

有兩種方法進(jìn)行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。

對于buffer and in memory sort,主要思想是:在reduce()函數(shù)中,將某個key對應(yīng)的所有value保存下來,然后進(jìn)行排序。 這種方法最大的缺點是:可能會造成out of memory。

對于value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現(xiàn)WritableComparable接口或者調(diào)用setSortComparatorClass函數(shù)),這樣reduce獲取的結(jié)果便是先按key排序,后按value排序的結(jié)果,需要注意的是,用戶需要自己實現(xiàn)Paritioner,以便只按照key進(jìn)行數(shù)據(jù)劃分。Hadoop顯式的支持二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用于設(shè)置排序group的key值,具體參考:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

4. 后記

最近一直在找工作,由于簡歷上寫了熟悉Hadoop,所以幾乎每個面試官都會問一些Hadoop相關(guān)的東西,而 Hadoop上Join的實現(xiàn)就成了一道必問的問題,而極個別公司還會涉及到DistributedCache原理以及怎樣利用DistributedCache進(jìn)行Join操作。為了更好地應(yīng)對這些面試官,特整理此文章。

?

5. 參考資料

(1) 書籍《Data-Intensive Text Processing with MapReduce》 page 60~67 Jimmy Lin and Chris Dyer,University of Maryland, College Park

(2) 書籍《Hadoop In Action》page 107~131

(3) mapreduce的二次排序 SecondarySort:http://www.cnblogs.com/xuxm2007/archive/2011/09/03/2165805.html

(4) 半連接介紹:http://wenku.baidu.com/view/ae7442db7f1922791688e877.html

(5) BloomFilter介紹:http://blog.csdn.net/jiaomeng/article/details/1495500

(6)本文來自:http://dongxicheng.org/mapreduce/hadoop-join-two-tables/

————————————————————————————————————————————————

看完了上面的 hadoop 中 MR 常規(guī) join 思路,下面我們來看一種比較極端的例子,大表 join 小表,而小表的大小在 5M 以下的情況:

之所以我這里說小表要限制 5M 以下,是因為我這里用到的思路是 :

file-》jar-》main String configuration -》configuration?map HashMap

步驟:

1、從jar里面讀取的文件內(nèi)容以String的形式存在main方法的?configuration context 全局環(huán)境變量里

2、在map函數(shù)里讀取 context 環(huán)境變量的字符串,然后split字符串組建小表成為一個HashMap

? ? ?這樣一個大表關(guān)聯(lián)小表的例子就ok了,由于context是放在namenode上的,而namenode對內(nèi)存是有限制的,

所以你的小表文件不要太大,這樣我們可以比較的方便的利用 context 做join了。

這種方式其實就是?2.2 map side join?的一種具體實現(xiàn)而已。

Talk is cheap, show you the code~

public class Test {public static class MapperClass extendsMapper<LongWritable, Text, Text, Text> {Configuration config = null;HashSet<String> idSet = new HashSet<String>();HashMap<String, String> cityIdNameMap = new HashMap<String, String>();Map<String, String> houseTypeMap = new HashMap<String, String>();public void setup(Context context) {config = context.getConfiguration();if (config == null)return;String idStr = config.get("idStr");String[] idArr = idStr.split(",");for (String id : idArr) {idSet.add(id);}String cityIdNameStr = config.get("cityIdNameStr");String[] cityIdNameArr = cityIdNameStr.split(",");for (String cityIdName : cityIdNameArr) {cityIdNameMap.put(cityIdName.split("\t")[0],cityIdName.split("\t")[1]);}houseTypeMap.put("8", "Test");}public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] info = value.toString().split("\\|");String insertDate = info[InfoField.InsertDate].split(" ")[0].split("-")[0]; // date: 2012-10-01insertDate = insertDate+ info[InfoField.InsertDate].split(" ")[0].split("-")[1]; // date:201210String userID = info[InfoField.UserID]; // useridif (!idSet.contains(userID)) {return;}String disLocalID = "";String[] disLocalIDArr = info[InfoField.DisLocalID].split(",");if (disLocalIDArr.length >= 2) {disLocalID = disLocalIDArr[1];} else {try {disLocalID = disLocalIDArr[0];} catch (Exception e) {e.printStackTrace();return;}}String localValue = cityIdNameMap.get(disLocalID);disLocalID = localValue == null ? disLocalID : localValue; // cityString[] cateIdArr = info[InfoField.CateID].split(",");String cateId = "";String secondType = "";if (cateIdArr.length >= 3) {cateId = cateIdArr[2];if (houseTypeMap.get(cateId) != null) {secondType = houseTypeMap.get(cateId); // secondType} else {return;}} else {return;}String upType = info[InfoField.UpType];String outKey = insertDate + "_" + userID + "_" + disLocalID + "_"+ secondType;String outValue = upType.equals("0") ? "1_1" : "1_0";context.write(new Text(outKey), new Text(outValue));}}public static class ReducerClass extendsReducer<Text, Text, NullWritable, Text> {public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {int pv = 0;int uv = 0;for (Text val : values) {String[] tmpArr = val.toString().split("_");pv += Integer.parseInt(tmpArr[0]);uv += Integer.parseInt(tmpArr[1]);}String outValue = key + "_" + pv + "_" + uv;context.write(NullWritable.get(), new Text(outValue));}}public String getResource(String fileFullName) throws IOException {// 返回讀取指定資源的輸入流InputStream is = this.getClass().getResourceAsStream(fileFullName);BufferedReader br = new BufferedReader(new InputStreamReader(is));String s = "";String res = "";while ((s = br.readLine()) != null)res = res.equals("") ? s : res + "," + s;return res;}public static void main(String[] args) throws IOException,InterruptedException, ClassNotFoundException {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length != 2) {System.exit(2);}String idStr = new Test().getResource("userIDList.txt");String cityIdNameStr = new Test().getResource("cityIdName.txt");conf.set("idStr", idStr);conf.set("cityIdNameStr", cityIdNameStr);Job job = new Job(conf, "test01");// job.setInputFormatClass(TextInputFormat.class);job.setJarByClass(Test.class);job.setMapperClass(Test.MapperClass.class);job.setReducerClass(Test.ReducerClass.class);job.setNumReduceTasks(25);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);FileInputFormat.addInputPath(job, new Path(otherArgs[0]));org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }

說明:

1、getResource() 方法指定了可以從jar包中讀取配置文件,并拼接成一個String返回。

2、setup() 方法起到一個mapreduce前的初始化的工作,他的作用是從 context 中

獲取main中存入的配置文件字符串,并用來構(gòu)建一個hashmap,放在map外面,

每個node上MR前只被執(zhí)行一次。

3、注意上面代碼的第 125、126 行,conf.set(key, value) 中的 value 大小是由限制的,

在 0.20.x 版本中是 5M 的大小限制,如果大于此大小建議采用分布式緩存讀文件的策略。

參考:解決 hadoop jobconf 限制為5M的問題

http://my.oschina.net/132722/blog/174601

?

推薦閱讀:

?

使用HBase的MAP側(cè)聯(lián)接

?http://blog.sina.com.cn/s/blog_ae33b83901016lkq.html?

?

PS:關(guān)于如何從jar包中讀取配置文件,請參考:

(1)深入jar包:從jar包中讀取資源文件?? ? ?

? ? ?http://www.iteye.com/topic/483115

(2)讀取jar內(nèi)資源文件? ? ?

? ? ?http://heipark.iteye.com/blog/1439114

(3)Java相對路徑讀取資源文件?? ?

? ? ? ? ?http://lavasoft.blog.51cto.com/62575/265821/

(4)Java加載資源文件時的路徑問題???

? ? ? ? ?http://www.cnblogs.com/lmtoo/archive/2012/10/18/2729272.html

? ? ? ? ?如何優(yōu)雅讀取properties文件

? ? ? ? ?http://blogread.cn/it/article/3262?f=wb

注意:

不能先?getResource() ?獲取路徑然后讀取內(nèi)容,

因為".../ResourceJar.jar!/resource/...."并不是文件資源定位符的格式。

所以,如果jar包中的類源代碼用File f=new File(相對路徑);的形式,是不可能定位到文件資源的。

這也是為什么源代碼打包成jar文件后,調(diào)用jar包時會報出FileNotFoundException的癥結(jié)所在了。

但可以通過Class類的getResourceAsStream()方法來直接獲取文件內(nèi)容?,

這種方法是如何讀取jar中的資源文件的,這一點對于我們來說是透明的。

而且?getResource() 和?getResourceAsStream() 在 maven 項目下對于相對、絕對路徑的尋找規(guī)則貌似還不一樣:

System.out.println(QQWryFile.class.getResource("/qqwry.dat").getFile());?

System.out.println(QQWryFile.class.getClassLoader().getResourceAsStream("/qqwry.dat"));
System.out.println(QQWryFile.class.getClassLoader().getResourceAsStream("qqwry.dat"));

System.out.println(QQWryFile.class.getResourceAsStream("/qqwry.dat"));
System.out.println(QQWryFile.class.getResourceAsStream("qqwry.dat"));

TIPS:Class和ClassLoader的getResourceAsStream()方法的區(qū)別:

這兩個方法還是略有區(qū)別的, 以前一直不加以區(qū)分,直到今天發(fā)現(xiàn)要寫這樣的代碼的時候運(yùn)行?
錯誤, 才把這個問題澄清了一下。?

基本上,兩個都可以用于從 classpath 里面進(jìn)行資源讀取, ?classpath包含classpath中的路徑?
和classpath中的jar。?

兩個方法的區(qū)別是資源的定義不同, 一個主要用于相對與一個object取資源,而另一個用于取相對于classpath的?
資源,用的是絕對路徑。?

在使用Class.getResourceAsStream 時, 資源路徑有兩種方式, 一種以 / 開頭,則這樣的路徑是指定絕對?
路徑, 如果不以 / 開頭, 則路徑是相對與這個class所在的包的。?

在使用ClassLoader.getResourceAsStream時, 路徑直接使用相對于classpath的絕對路徑。?

舉例,下面的三個語句,實際結(jié)果是一樣的:

com.explorers.Test.class.getResourceAsStream("abc.jpg") = com.explorers.Test.class.getResourceAsStream("/com/explorers/abc.jpg") = ClassLoader.getResourceAsStream("com/explorers/abc.jpg")

http://macrochen.iteye.com/blog/293918

http://blogread.cn/it/article/3262?f=wb

轉(zhuǎn)發(fā):https://my.oschina.net/leejun2005/blog/95186

總結(jié)

以上是生活随笔為你收集整理的MapReduce 中的两表 join 几种方案简介的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。