日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MAPREDUCE的实战案例

發(fā)布時間:2025/3/15 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MAPREDUCE的实战案例 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

reducejoin算法實現(xiàn)

?

1、需求:

?

訂單數(shù)據(jù)表t_order

?

id

date

pid

amount

1001

20150710

P0001

2

1002

20150710

P0001

3

1002

20150710

P0002

3

?

?

?

商品信息表t_product

?

id

pname

category_id

price

P0001

小米5

1000

2

P0002

錘子T1

1000

3

假如數(shù)據(jù)量巨大,兩表的數(shù)據(jù)是以文件的形式存儲在HDFS中,需要用mapreduce程序來實現(xiàn)一下SQL查詢運算:?

select ?a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id

2、實現(xiàn)機制:

通過將關(guān)聯(lián)的條件作為map輸出的key,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個reduce task,在reduce中進行數(shù)據(jù)的串聯(lián)

?

public class OrderJoin {static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到一行數(shù)據(jù),并且要分辨出這行數(shù)據(jù)所屬的文件String line = value.toString();String[] fields = line.split("\t");// 拿到itemidString itemid = fields[0];// 獲取到這一行所在的文件名(通過inpusplit)String name = "你拿到的文件名";// 根據(jù)文件名,切分出各字段(如果是a,切分出兩個字段,如果是b,切分出3個字段) OrderJoinBean bean = new OrderJoinBean();bean.set(null, null, null, null, null);context.write(new Text(itemid), bean);}}static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {//拿到的key是某一個itemid,比如1000//拿到的beans是來自于兩類文件的bean// {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name}//將來自于b文件的bean里面的字段,跟來自于a的所有bean進行字段拼接并輸出 }} }

缺點:這種方式中,join的操作是在reduce階段完成,reduce端的處理壓力太大,map節(jié)點的運算負載則很低,資源利用率不高,且在reduce階段極易產(chǎn)生數(shù)據(jù)傾斜

?

解決方案: mapjoin實現(xiàn)方式

?

?

1、原理闡述

?

適用于關(guān)聯(lián)表中有小表的情形;

?

可以將小表分發(fā)到所有的map節(jié)點,這樣,map節(jié)點就可以在本地對自己所讀到的大表數(shù)據(jù)進行join并輸出最終結(jié)果,可以大大提高join操作的并發(fā)度,加快處理速度

?

2、實現(xiàn)示例

?

--先在mapper類中預(yù)先定義好小表,進行join

?

--引入實際場景中的解決方案:一次加載數(shù)據(jù)庫或者用distributedcache

?

?

public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in = null;BufferedReader reader = null;HashMap<String,String> b_tab = new HashMap<String, String>();String localpath =null;String uirpath = null;//是在map任務(wù)初始化的時候調(diào)用一次 @Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗證用Path[] files = context.getLocalCacheFiles();localpath = files[0].toString();URI[] cacheFiles = context.getCacheFiles();//緩存文件的用法——直接用本地IO來讀取//這里讀的數(shù)據(jù)是map task所在機器本地工作目錄中的一個小文件in = new FileReader("b.txt");reader =new BufferedReader(in);String line =null;while(null!=(line=reader.readLine())){String[] fields = line.split(",");b_tab.put(fields[0],fields[1]);}IOUtils.closeStream(reader);IOUtils.closeStream(in);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//這里讀的是這個map task所負責(zé)的那一個切片數(shù)據(jù)(在hdfs上)String[] fields = value.toString().split("\t");String a_itemid = fields[0];String a_amount = fields[1];String b_name = b_tab.get(a_itemid);// 輸出結(jié)果 1001 98.9 banancontext.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TestDistributedCache.class);job.setMapperClass(TestDistributedCacheMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//這里是我們正常的需要處理的數(shù)據(jù)所在路徑FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//不需要reducerjob.setNumReduceTasks(0);//分發(fā)一個文件到task進程的工作目錄job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));//分發(fā)一個歸檔文件到task進程的工作目錄 // job.addArchiveToClassPath(archive);//分發(fā)jar包到task節(jié)點的classpath下 // job.addFileToClassPath(jarfile); job.waitForCompletion(true);} }

?

?

web日志預(yù)處理

1、需求:

web訪問日志中的各字段識別切分

去除日志中不合法的記錄

根據(jù)KPI統(tǒng)計需求,生成各類訪問請求過濾數(shù)據(jù)

?

2、實現(xiàn)代碼:

a) 定義一個bean,用來記錄日志數(shù)據(jù)中的各數(shù)據(jù)字段

?

public class WebLogBean {private String remote_addr;// 記錄客戶端的ip地址private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"private String time_local;// 記錄訪問時間與時區(qū)private String request;// 記錄請求的url與http協(xié)議private String status;// 記錄請求狀態(tài);成功是200private String body_bytes_sent;// 記錄發(fā)送給客戶端文件主體內(nèi)容大小private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的private String http_user_agent;// 記錄客戶瀏覽器的相關(guān)信息private boolean valid = true;// 判斷數(shù)據(jù)是否合法public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.remote_addr);sb.append("\001").append(this.remote_user);sb.append("\001").append(this.time_local);sb.append("\001").append(this.request);sb.append("\001").append(this.status);sb.append("\001").append(this.body_bytes_sent);sb.append("\001").append(this.http_referer);sb.append("\001").append(this.http_user_agent);return sb.toString(); } }

?

?

?

b)定義一個parser用來解析過濾web訪問日志原始記錄

?

public class WebLogParser {public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);webLogBean.setTime_local(arr[3].substring(1));webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);if (arr.length > 12) {webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP錯誤webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static String parserTime(String time) {time.replace("/", "-");return time;} }

?

c) mapreduce程序

?

public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();NullWritable v = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();WebLogBean webLogBean = WebLogParser.parser(line);if (!webLogBean.isValid())return;k.set(webLogBean.toString());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);} }

?

?

?

?

流量統(tǒng)計相關(guān)需求

1、對流量日志中的用戶統(tǒng)計總上、下行流量技術(shù)點: 自定義javaBean用來在mapreduce中充當(dāng)value

注意: javaBean要實現(xiàn)Writable接口,實現(xiàn)兩個方法

//序列化,將對象的字段信息寫入輸出流 @Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upflow);out.writeLong(downflow);out.writeLong(sumflow);}//反序列化,從輸入流中讀取各個字段信息 @Overridepublic void readFields(DataInput in) throws IOException {upflow = in.readLong();downflow = in.readLong();sumflow = in.readLong();}

1、統(tǒng)計流量且按照流量大小倒序排序

技術(shù)點:這種需求,用一個mapreduce -job 不好實現(xiàn),需要兩個mapreduce -job

第一個job負責(zé)流量統(tǒng)計,跟上題相同

第二個job讀入第一個job的輸出,然后做排序

要將flowBean作為mapkey輸出,這樣mapreduce就會自動排序?此時,flowBean要實現(xiàn)接口WritableComparable? ?要實現(xiàn)其中的compareTo()方法,方法中,我們可以定義倒序比較的邏輯

1、統(tǒng)計流量且按照手機號的歸屬地,將結(jié)果數(shù)據(jù)輸出到不同的省份文件中技術(shù)點:自定義Partitioner

@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0,3);Integer partNum = pmap.get(prefix);return (partNum==null?4:partNum);}

自定義partition后,要根據(jù)自定義partitioner的邏輯設(shè)置相應(yīng)數(shù)量的reduce task

job.setNumReduceTasks(5);

?

注意:如果reduceTask的數(shù)量>= getPartition的結(jié)果數(shù) ?,則會多產(chǎn)生幾個空的輸出文件part-r-000xx

如果 ????1<reduceTask的數(shù)量<getPartition的結(jié)果數(shù) ,則有一部分分區(qū)數(shù)據(jù)無處安放,會Exception!!!

如果 reduceTask的數(shù)量=1,則不管mapTask端輸出多少個分區(qū)文件,最終結(jié)果都交給這一個reduceTask,最終也就只會產(chǎn)生一個結(jié)果文件 part-r-00000

?

社交粉絲數(shù)據(jù)分析

以下是qq的好友列表數(shù)據(jù),冒號前是一個用,冒號后是該用戶的所有好友(數(shù)據(jù)中的好友關(guān)系是單向的)

A:B,C,D,F,E,O

B:A,C,E,K

C:F,A,D,I

D:A,E,F,L

E:B,C,D,M,L

F:A,B,C,D,E,O,M

G:A,C,D,E,F

H:A,C,D,E,O

I:A,O

J:B,O

K:A,C,D

L:D,E,F

M:E,F,G

O:A,H,I,J

?

求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?

解題思路:

求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?

解題思路:

第一步 ?

map

讀一行 ??A:B,C,D,F,E,O

輸出 ???<B,A><C,A><D,A><F,A><E,A><O,A>

在讀一行 ??B:A,C,E,K

輸出 ??<A,B><C,B><E,B><K,B>

?

?

REDUCE

拿到的數(shù)據(jù)比如<C,A><C,B><C,E><C,F><C,G>......

輸出: ?

<A-B,C>

<A-E,C>

<A-F,C>

<A-G,C>

<B-E,C>

<B-F,C>.....

?

?

?

第二步

map

讀入一行<A-B,C>

直接輸出<A-B,C>

?

reduce

讀入數(shù)據(jù) ?<A-B,C><A-B,F><A-B,G>.......

輸出: A-B ?C,F,G,.....

?

package cn.itcast.bigdata.mr.fensi;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepOne {static class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// A:B,C,D,F,E,OString line = value.toString();String[] person_friends = line.split(":");String person = person_friends[0];String friends = person_friends[1];for (String friend : friends.split(",")) {// 輸出<好友,人>context.write(new Text(friend), new Text(person));}}}static class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text person : persons) {sb.append(person).append(",");}context.write(friend, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepOne.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepOneMapper.class);job.setReducerClass(SharedFriendsStepOneReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/srcdata/friends"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));job.waitForCompletion(true);}} package cn.itcast.bigdata.mr.fensi;import java.io.IOException; import java.util.Arrays;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepTwo {static class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {// 拿到的數(shù)據(jù)是上一個步驟的輸出結(jié)果// A I,K,C,B,G,F,H,O,D,// 友 人,人,人 @Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] friend_persons = line.split("\t");String friend = friend_persons[0];String[] persons = friend_persons[1].split(",");Arrays.sort(persons);for (int i = 0; i < persons.length - 1; i++) {for (int j = i + 1; j < persons.length; j++) {// 發(fā)出 <人-人,好友> ,這樣,相同的“人-人”對的所有好友就會到同1個reduce中去context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));}}}}static class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text friend : friends) {sb.append(friend).append(" ");}context.write(person_person, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepTwo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepTwoMapper.class);job.setReducerClass(SharedFriendsStepTwoReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/temp/out/part-r-00000"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out2"));job.waitForCompletion(true);}}

?

?

?

?

轉(zhuǎn)載于:https://www.cnblogs.com/duan2/p/7538049.html

總結(jié)

以上是生活随笔為你收集整理的MAPREDUCE的实战案例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。