日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

mapreduce之partition分区

發(fā)布時(shí)間:2023/12/15 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 mapreduce之partition分区 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

聽(tīng)了超哥的一席課后逐漸明白了partition,記錄一下自己的理解!(thanks 超哥)

package partition;import java.io.IOException; import java.net.URI;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** @ClassName: FlowCount2 * @Description: TODO(這里用一句話描述這個(gè)類(lèi)的作用) * @author zhangweixiang* @date 2014年3月6日 下午3:27:56*/ /*** 分區(qū)的例子必須打成jar運(yùn)行* 用處: 1.根據(jù)業(yè)務(wù)需要,產(chǎn)生多個(gè)輸出文件* 2.多個(gè)reduce任務(wù)在運(yùn)行,提高整體job的運(yùn)行效率*/ public class FlowCount2 {public static final String INPUT_PATH = "hdfs://192.168.0.9:9000/wlan2";public static final String OUT_PATH = "hdfs://192.168.0.9:9000/myout";public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = new Job(conf, FlowCount2.class.getSimpleName());//指定打包的jarjob.setJarByClass(FlowCount2.class);// 1.1指定輸入文件的路徑FileInputFormat.addInputPath(job, new Path(INPUT_PATH));// 指定輸入信息的格式化類(lèi)job.setInputFormatClass(TextInputFormat.class);// 1.2指定自定義map類(lèi)job.setMapperClass(MyMapper.class);// 設(shè)置map輸出類(lèi)型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowWritable.class);// 1.3指定分區(qū)job.setPartitionerClass(MyPartition.class);// 設(shè)置reduce的任務(wù)個(gè)數(shù),由于map輸出后建立了兩個(gè)分區(qū),所以應(yīng)該設(shè)置兩個(gè)reduce任務(wù)輸出到不同的文件(一個(gè)分區(qū)對(duì)應(yīng)一個(gè)reduce任務(wù))job.setNumReduceTasks(2);// 1.4排序,分組// 1.5規(guī)約// 2.2指定自定義的reduce類(lèi)job.setReducerClass(MyReduce.class);// 設(shè)置輸出類(lèi)型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowWritable.class);// 設(shè)置輸出格式化類(lèi)job.setOutputFormatClass(TextOutputFormat.class);// 如果輸出文件路徑存在則刪除FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH),new Configuration());Path path = new Path(OUT_PATH);if (fileSystem.exists(path)) {fileSystem.delete(path, true);}// 2.3指定輸出路徑FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));// 提交任務(wù)job.waitForCompletion(true);}static class MyMapper extendsMapper<LongWritable, Text, Text, FlowWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 分割行String[] split = value.toString().split("\t");// 獲取用戶電話號(hào)碼String mobile = "";long upPackNum = 0l;long downPackNum = 0l;long upPayLoad = 0l;long downPayLoad = 0l;// 符合規(guī)范的電話號(hào)碼if (!("".equals(split[2]))) {mobile = split[2];// 獲取流量信息if (!("".equals(split[21]))) {upPackNum = Long.parseLong(split[21]);}if (!("".equals(split[22]))) {downPackNum = Long.parseLong(split[22]);}if (!("".equals(split[23]))) {upPayLoad = Long.parseLong(split[23]);}if (!("".equals(split[24]))) {downPayLoad = Long.parseLong(split[24]);}FlowWritable flowWritable = new FlowWritable(upPackNum,downPackNum, upPayLoad, downPayLoad);context.write(new Text(mobile), flowWritable);}}}static class MyReduce extendsReducer<Text, FlowWritable, Text, FlowWritable> {@Overrideprotected void reduce(Text k2, Iterable<FlowWritable> v2s,Context context) throws IOException, InterruptedException {long upPackNum = 0l;long downPackNum = 0l;long upPayLoad = 0l;long downPayLoad = 0l;for (FlowWritable flowWritable : v2s) {upPackNum += flowWritable.upPackNum;downPackNum += flowWritable.downPackNum;upPayLoad += flowWritable.upPayLoad;downPayLoad += flowWritable.downPayLoad;}FlowWritable flowWritable = new FlowWritable(upPackNum,downPackNum, upPayLoad, downPayLoad);context.write(k2, flowWritable);}}}


package partition;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.Writable;/*** @ClassName: flowWritable* @Description: 自定義類(lèi)型實(shí)現(xiàn)Writable接口,包含四個(gè)參數(shù)(upPackNum 上行包, downPackNum 下行包,* upPayLoad 發(fā)送流量,downPayLoad 下載流量)* @author zhangweixiang* @date 2014年3月5日 上午11:37:10*/ public class FlowWritable implements Writable {public long upPackNum;public long downPackNum;public long upPayLoad;public long downPayLoad;public FlowWritable() {// TODO Auto-generated constructor stub}public FlowWritable(long upPackNum, long downPackNum, long upPayLoad,long downPayLoad) {this.upPackNum = upPackNum;this.downPackNum = downPackNum;this.upPayLoad = upPayLoad;this.downPayLoad = downPayLoad;}@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upPackNum);out.writeLong(downPackNum);out.writeLong(upPackNum);out.writeLong(downPayLoad);}@Overridepublic void readFields(DataInput in) throws IOException {this.upPackNum = in.readLong();this.downPackNum = in.readLong();this.upPayLoad = in.readLong();this.downPayLoad = in.readLong();}/** (非 Javadoc)* * * @return* * @see java.lang.Object#toString()*/@Overridepublic String toString() {return upPackNum + "\t" + downPackNum + "\t" + upPayLoad + "\t"+ downPayLoad;}}
package partition;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /*** @ClassName: MyPartition * @Description: 根據(jù)電話號(hào)碼分區(qū),正規(guī)號(hào)碼分區(qū)代號(hào)為0,非正規(guī)號(hào)碼分區(qū)為1(在此建立了兩個(gè)分區(qū),即會(huì)產(chǎn)生兩個(gè)reduce任務(wù)輸出到不同的文件0和1)* @param K k2(map輸出的鍵), V v2(map輸出的值)* @author zhangweixiang* @date 2014年3月6日 下午3:02:29*/ public class MyPartition extends HashPartitioner<Text,FlowWritable>{@Overridepublic int getPartition(Text key, FlowWritable value, int numReduceTasks) {int p=0;if(key.toString().length()!=11){p=1;}return p;} }
注:必須要達(dá)成jar包上傳到linux下執(zhí)行(我開(kāi)始沒(méi)有打成jar包直接在eclipse下執(zhí)行拋了異常)

執(zhí)行完成后會(huì)產(chǎn)生兩個(gè)文件(part-r-00000和part-r-00001)分別記錄不同條件的信息。

eclipse直接運(yùn)行拋的異常:

14/03/06 15:41:13 WARN mapred.LocalJobRunner: job_local_0001
java.io.IOException: Illegal partition for 10.80.203.79 (1)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1073)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:691)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:120)
at partition.FlowCount2$MyMapper.map(FlowCount2.java:1)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:214)
14/03/06 15:41:14 INFO mapred.JobClient: ?map 0% reduce 0%
14/03/06 15:41:14 INFO mapred.JobClient: Job complete: job_local_0001
14/03/06 15:41:14 INFO mapred.JobClient: Counters: 0



記錄超哥的總結(jié):

分區(qū)的例子必須打成jar運(yùn)行* 用處: *1.根據(jù)業(yè)務(wù)需要,產(chǎn)生多個(gè)輸出文件* 2.多個(gè)reduce任務(wù)在運(yùn)行,提高整體job的運(yùn)行效率

總結(jié)

以上是生活随笔為你收集整理的mapreduce之partition分区的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。