日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

ChainMapper和ChainReducer

發布時間:2024/4/11 编程问答 27 豆豆
生活随笔 收集整理的這篇文章主要介紹了 ChainMapper和ChainReducer 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
The ChainMapper class allows to use multiple Mapper classes within a single Map task.?

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

?

?http://www.oratea.net/?p=371

?通過ChainMapper可以將多個map類合并成一個map任務。

下面個這個例子沒什么實際意思,但是很好的演示了ChainMapper的作用。

源文件
100 tom 90
101 mary 85
102 kate 60

map00的結果,過濾掉100的記錄
101 mary 85
102 kate 60

map01的結果,過濾掉101的記錄
102 kate 60

reduce結果
102 kate 60

?package?org.myorg;


import?java.io.IOException;
import?java.util.*;
import?java.lang.String;

import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.conf.*;
import?org.apache.hadoop.io.*;
import?org.apache.hadoop.mapred.*;
import?org.apache.hadoop.util.*;
import?org.apache.hadoop.mapred.lib.*;

public?class?WordCount
{

????
public?static?class?Map00?extends?MapReduceBase?implements?Mapper
????{

????????
public?void?map(Text?key,?Text?value,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{

????????????Text?ft?
=?new?Text(“100″);

????????????
if(!key.equals(ft))
????????????{
????????????????output.collect(key,?value);
????????????}
????????}
????}

????
public?static?class?Map01?extends?MapReduceBase?implements?Mapper
????{

????????
public?void?map(Text?key,?Text?value,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{

????????????Text?ft?
=?new?Text(“101″);

????????????
if(!key.equals(ft))
????????????{
????????????????output.collect(key,?value);
????????????}
????????}
????}

????
public?static?class?Reduce?extends?MapReduceBase?implements?Reducer
????{
????????
public?void?reduce(Text?key,?Iterator?values,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{

????????????
while(values.hasNext())
????????????{
????????????????output.collect(key,?values.next());
????????????}

????????}
????}

????
public?static?void?main(String[]?args)?throws?Exception
????{

????????JobConf?conf?
=?new?JobConf(WordCount.class);
????????conf.setJobName(“wordcount00″);

????????conf.setInputFormat(KeyValueTextInputFormat.
class);
????????conf.setOutputFormat(TextOutputFormat.
class);

????????ChainMapper?cm?
=?new?ChainMapper();

????????JobConf?mapAConf?
=?new?JobConf(false);
????????cm.addMapper(conf,?Map00.
class,?Text.class,?Text.class,?Text.class,?Text.class,?true,?mapAConf);

????????JobConf?mapBConf?
=?new?JobConf(false);
????????cm.addMapper(conf,?Map01.
class,?Text.class,?Text.class,?Text.class,?Text.class,?true,?mapBConf);

????????conf.setReducerClass(Reduce.
class);

????????conf00.setOutputKeyClass(Text.
class);
????????conf00.setOutputValueClass(Text.
class);

????????FileInputFormat.setInputPaths(conf,?
new?Path(args[0]));
????????FileOutputFormat.setOutputPath(conf,?
new?Path(args[1]));

????????JobClient.runJob(conf);

????}
}
?

?

另外一個例子,代碼很多,其實很簡單,Conn幾個類都是相同的

http://yixiaohuamax.iteye.com/blog/684244?

package?com.oncedq.code;

import?java.io.DataInput;
import?java.io.DataOutput;
import?java.io.IOException;
import?java.text.SimpleDateFormat;

import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.LongWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.io.WritableComparable;
import?org.apache.hadoop.mapred.FileInputFormat;
import?org.apache.hadoop.mapred.FileOutputFormat;
import?org.apache.hadoop.mapred.JobConf;
import?org.apache.hadoop.mapred.MapReduceBase;
import?org.apache.hadoop.mapred.Mapper;
import?org.apache.hadoop.mapred.OutputCollector;
import?org.apache.hadoop.mapred.Reporter;
import?org.apache.hadoop.mapred.TextInputFormat;
import?org.apache.hadoop.mapred.TextOutputFormat;
import?org.apache.hadoop.mapred.jobcontrol.Job;
import?org.apache.hadoop.mapred.jobcontrol.JobControl;
import?org.apache.hadoop.mapred.lib.ChainMapper;

import?com.oncedq.code.util.DateUtil;

public?class?ProcessSample?{
????
public?static?class?ExtractMappper?extends?MapReduceBase?implements
????????????Mapper
<LongWritable,?Text,?LongWritable,?Conn1>?{

????????@Override
????????
public?void?map(LongWritable?arg0,?Text?arg1,
????????????????OutputCollector
<LongWritable,?Conn1>?arg2,?Reporter?arg3)
????????????????
throws?IOException?{
????????????String?line?
=?arg1.toString();
????????????String[]?strs?
=?line.split(";");
????????????Conn1?conn1?
=?new?Conn1();
????????????conn1.orderKey?
=?Long.parseLong(strs[0]);
????????????conn1.customer?
=?Long.parseLong(strs[1]);
????????????conn1.state?
=?strs[2];
????????????conn1.price?
=?Double.parseDouble(strs[3]);
????????????conn1.orderDate?
=?DateUtil.getDateFromString(strs[4],?"yyyy-MM-dd");
????????????LongWritable?lw?
=?new?LongWritable(conn1.orderKey);
????????????arg2.collect(lw,?conn1);
????????}

????}

????
private?static?class?Conn1?implements?WritableComparable<Conn1>?{
????????
public?long?orderKey;
????????
public?long?customer;
????????
public?String?state;
????????
public?double?price;
????????
public?java.util.Date?orderDate;

????????@Override
????????
public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?
=?in.readLong();
????????????customer?
=?in.readLong();
????????????state?
=?Text.readString(in);
????????????price?
=?in.readDouble();
????????????orderDate?
=?DateUtil.getDateFromString(Text.readString(in),
????????????????????
"yyyy-MM-dd");
????????}

????????@Override
????????
public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?
"yyyy-MM-dd"));
????????}

????????@Override
????????
public?int?compareTo(Conn1?arg0)?{
????????????
//?TODO?Auto-generated?method?stub
????????????return?0;
????????}

????}

????
public?static?class?Filter1Mapper?extends?MapReduceBase?implements
????????????Mapper
<LongWritable,?Conn1,?LongWritable,?Conn2>?{

????????@Override
????????
public?void?map(LongWritable?inKey,?Conn1?c2,
????????????????OutputCollector
<LongWritable,?Conn2>?collector,?Reporter?report)
????????????????
throws?IOException?{
????????????
if?(c2.state.equals("F"))?{
????????????????Conn2?inValue?
=?new?Conn2();
????????????????inValue.customer?
=?c2.customer;
????????????????inValue.orderDate?
=?c2.orderDate;
????????????????inValue.orderKey?
=?c2.orderKey;
????????????????inValue.price?
=?c2.price;
????????????????inValue.state?
=?c2.state;
????????????????collector.collect(inKey,?inValue);
????????????}
????????}

????}

????
private?static?class?Conn2?implements?WritableComparable<Conn1>?{
????????
public?long?orderKey;
????????
public?long?customer;
????????
public?String?state;
????????
public?double?price;
????????
public?java.util.Date?orderDate;

????????@Override
????????
public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?
=?in.readLong();
????????????customer?
=?in.readLong();
????????????state?
=?Text.readString(in);
????????????price?
=?in.readDouble();
????????????orderDate?
=?DateUtil.getDateFromString(Text.readString(in),
????????????????????
"yyyy-MM-dd");
????????}

????????@Override
????????
public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?
"yyyy-MM-dd"));
????????}

????????@Override
????????
public?int?compareTo(Conn1?arg0)?{
????????????
//?TODO?Auto-generated?method?stub
????????????return?0;
????????}

????}

????
public?static?class?RegexMapper?extends?MapReduceBase?implements
????????????Mapper
<LongWritable,?Conn2,?LongWritable,?Conn3>?{

????????@Override
????????
public?void?map(LongWritable?inKey,?Conn2?c3,
????????????????OutputCollector
<LongWritable,?Conn3>?collector,?Reporter?report)
????????????????
throws?IOException?{
????????????c3.state?
=?c3.state.replaceAll("F",?"Find");
????????????Conn3?c2?
=?new?Conn3();
????????????c2.customer?
=?c3.customer;
????????????c2.orderDate?
=?c3.orderDate;
????????????c2.orderKey?
=?c3.orderKey;
????????????c2.price?
=?c3.price;
????????????c2.state?
=?c3.state;
????????????collector.collect(inKey,?c2);
????????}
????}

????
private?static?class?Conn3?implements?WritableComparable<Conn1>?{
????????
public?long?orderKey;
????????
public?long?customer;
????????
public?String?state;
????????
public?double?price;
????????
public?java.util.Date?orderDate;

????????@Override
????????
public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?
=?in.readLong();
????????????customer?
=?in.readLong();
????????????state?
=?Text.readString(in);
????????????price?
=?in.readDouble();
????????????orderDate?
=?DateUtil.getDateFromString(Text.readString(in),
????????????????????
"yyyy-MM-dd");
????????}

????????@Override
????????
public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?
"yyyy-MM-dd"));
????????}

????????@Override
????????
public?int?compareTo(Conn1?arg0)?{
????????????
//?TODO?Auto-generated?method?stub
????????????return?0;
????????}

????}

????
public?static?class?LoadMapper?extends?MapReduceBase?implements
????????????Mapper
<LongWritable,?Conn3,?LongWritable,?Conn3>?{

????????@Override
????????
public?void?map(LongWritable?arg0,?Conn3?arg1,
????????????????OutputCollector
<LongWritable,?Conn3>?arg2,?Reporter?arg3)
????????????????
throws?IOException?{
????????????arg2.collect(arg0,?arg1);
????????}

????}

????
public?static?void?main(String[]?args)?{
????????JobConf?job?
=?new?JobConf(ProcessSample.class);
????????job.setJobName(
"ProcessSample");
????????job.setNumReduceTasks(
0);
????????job.setInputFormat(TextInputFormat.
class);
????????job.setOutputFormat(TextOutputFormat.
class);
????????JobConf?mapper1?
=?new?JobConf();
????????JobConf?mapper2?
=?new?JobConf();
????????JobConf?mapper3?
=?new?JobConf();
????????JobConf?mapper4?
=?new?JobConf();
????????ChainMapper?cm?
=?new?ChainMapper();
????????cm.addMapper(job,?ExtractMappper.
class,?LongWritable.class,?Text.class,
????????????????LongWritable.
class,?Conn1.class,?true,?mapper1);
????????cm.addMapper(job,?Filter1Mapper.
class,?LongWritable.class,?Conn1.class,
????????????????LongWritable.
class,?Conn2.class,?true,?mapper2);
????????cm.addMapper(job,?RegexMapper.
class,?LongWritable.class,?Conn2.class,
????????????????LongWritable.
class,?Conn3.class,?true,?mapper3);
????????cm.addMapper(job,?LoadMapper.
class,?LongWritable.class,?Conn3.class,
????????????????LongWritable.
class,?Conn3.class,?true,?mapper4);
????????FileInputFormat.setInputPaths(job,?
new?Path("orderData"));
????????FileOutputFormat.setOutputPath(job,?
new?Path("orderDataOutput"));
????????Job?job1;
????????
try?{
????????????job1?
=?new?Job(job);
????????????JobControl?jc?
=?new?JobControl("test");
????????????jc.addJob(job1);
????????????jc.run();
????????}?
catch?(IOException?e)?{
????????????
//?TODO?Auto-generated?catch?block
????????????e.printStackTrace();
????????}

????}
}

總結

以上是生活随笔為你收集整理的ChainMapper和ChainReducer的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。