日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序...

發布時間:2025/6/15 编程问答 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序... 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

一:序列化概念

序列化(Serialization)是指把結構化對象轉化為字節流。
反序列化(Deserialization)是序列化的逆過程。即把字節流轉回結構化對象。
Java序列化(java.io.Serializable)

二:Hadoop序列化的特點

(1):序列化格式特點:
  緊湊:高效使用存儲空間。
  快速:讀寫數據的額外開銷小。
  可擴展:可透明地讀取老格式的數據。
  互操作:支持多語言的交互。

(2):Hadoop的序列化格式:Writable接口

三:Hadoop序列化的作用:

(1):序列化在分布式環境的兩大作用:進程間通信,永久存儲。
(2):Hadoop節點間通信。


四:Writable接口(實現序列化的類實現這個接口)

(1)Writable接口, 是根據 DataInput 和 DataOutput 實現的簡單、有效的序列化對象.

(2)MapReduce的任意Key和Value必須實現Writable接口.

(3)MapReduce的任意key必須實現WritableComparable接口.


?1:創建一個FlowBean的實體類,實現序列化操作:

1 package com.flowSum; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 9 /*** 10 * 11 * @author Administrator 12 * 1:write 是把每個對象序列化到輸出流 13 * 2:readFields是把輸入流字節反序列化 14 * 3:實現WritableComparable 15 * Java值對象的比較:一般需要重寫toString(),hashCode(),equals()方法 16 * 17 */ 18 public class FlowBean implements Writable{ 19 20 private String phoneNumber;//電話號碼 21 private long upFlow;//上行流量 22 private long downFlow;//下行流量 23 private long sumFlow;//總流量 24 25 26 27 public String getPhoneNumber() { 28 return phoneNumber; 29 } 30 public void setPhoneNumber(String phoneNumber) { 31 this.phoneNumber = phoneNumber; 32 } 33 public long getUpFlow() { 34 return upFlow; 35 } 36 public void setUpFlow(long upFlow) { 37 this.upFlow = upFlow; 38 } 39 public long getDownFlow() { 40 return downFlow; 41 } 42 public void setDownFlow(long downFlow) { 43 this.downFlow = downFlow; 44 } 45 public long getSumFlow() { 46 return sumFlow; 47 } 48 public void setSumFlow(long sumFlow) { 49 this.sumFlow = sumFlow; 50 } 51 52 //為了對象數據的初始化方便,加入一個帶參的構造函數 53 public FlowBean(String phoneNumber, long upFlow, long downFlow) { 54 this.phoneNumber = phoneNumber; 55 this.upFlow = upFlow; 56 this.downFlow = downFlow; 57 this.sumFlow = upFlow + downFlow; 58 } 59 //在反序列化時候,反射機制需要調用空參的構造函數,所以定義了一個空參的構造函數 60 public FlowBean() { 61 } 62 63 //重寫toString()方法 64 @Override 65 public String toString() { 66 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 67 } 68 69 70 //從數據流中反序列出對象的數據 71 //從數據流中讀取字段時必須和序列化的順序保持一致 72 @Override 73 public void readFields(DataInput in) throws IOException { 74 phoneNumber = in.readUTF(); 75 upFlow = in.readLong(); 76 downFlow = in.readLong(); 77 sumFlow = in.readLong(); 78 79 } 80 81 //將對象數據序列化到流中 82 @Override 83 public void write(DataOutput out) throws IOException { 84 out.writeUTF(phoneNumber); 85 out.writeLong(upFlow); 86 out.writeLong(downFlow); 87 out.writeLong(sumFlow); 88 89 } 90 91 92 }

創建FlowSumMapper的類實現Mapper這個類:

1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 /*** 10 * 11 * @author Administrator 12 * 1:FlowBean是我們自定義的一種數據類型,要在hadoop的各個節點之間進行傳輸,應該遵循hadoop的序列化 13 * 所以就必須實現hadoop的相應的序列化接口 14 * 2:Text一般認為它等價于java.lang.String的Writable。針對UTF-8序列。 15 */ 16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 17 18 //拿到日志中的一行數據,切分各個字段,抽取出我們需要的字段:手機號,上行流量,下行流量 19 //封裝成key-value發送出去 20 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 //拿到一行數據 25 String line = value.toString(); 26 //切分成各個字段 27 String[] fields = StringUtils.split(line,"/t"); 28 //拿到手機號的字段 29 String phoneNumber = fields[1]; 30 //拿到上行流量字段 31 long up_flow = Long.parseLong(fields[7]); 32 //拿到下行流量字段 33 long down_flow = Long.parseLong(fields[8]); 34 35 //最后一步,封裝數據為key-value進行輸出 36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 37 38 } 39 40 }

創建FlowSumReducer類繼承Reducer類:

1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.io.Text; 6 import org.apache.hadoop.mapreduce.Reducer; 7 8 public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ 9 10 //框架每傳遞一組數據<手機號,{flowbean,flowbean,flowbean...}>調用一次我們的reduce方法 11 //reduce中的業務邏輯就是遍歷values,然后累加求和再輸出 12 @Override 13 protected void reduce(Text key, Iterable<FlowBean> values, Context context) 14 throws IOException, InterruptedException { 15 //上行流量計數器和下行流量計數器 16 long up_flow_counter = 0; 17 long down_flow_counter = 0; 18 19 //上行流量和下行流量累加求和 20 for(FlowBean bean : values){ 21 up_flow_counter += bean.getUpFlow(); 22 down_flow_counter += bean.getDownFlow(); 23 } 24 25 //將結果輸出 26 context.write(key, new FlowBean(key.toString(), up_flow_counter, down_flow_counter)); 27 28 } 29 30 }

創建FlowSumRunner 繼承Configured實現Tool,規范性操作(Job描述和提交類的規范寫法):

1 package com.flowSum; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.conf.Configured; 5 import org.apache.hadoop.fs.Path; 6 import org.apache.hadoop.mapreduce.Job; 7 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 8 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9 import org.apache.hadoop.util.Tool; 10 import org.apache.hadoop.util.ToolRunner; 11 12 import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; 13 /*** 14 * 15 * @author Administrator 16 * 1:Job描述和提交類的規范寫法 17 */ 18 public class FlowSumRunner extends Configured implements Tool{ 19 20 21 @Override 22 public int run(String[] args) throws Exception { 23 //創建配置文件 24 Configuration conf = new Configuration(); 25 //獲取一個作業 26 Job job = Job.getInstance(conf); 27 28 //設置整個job所用的那些類在哪個jar包 29 job.setJarByClass(FlowSumRunner.class); 30 31 //本job使用的mapper和reducer的類 32 job.setMapperClass(FlowSumMapper.class); 33 job.setReducerClass(FlowSumReducer.class); 34 35 //指定mapper的輸出數據key-value類型 36 job.setMapOutputKeyClass(Text.class); 37 job.setMapOutputValueClass(FlowBean.class); 38 39 //指定reduce的輸出數據key-value類型 40 job.setOutputKeyClass(Text.class); 41 job.setOutputValueClass(FlowBean.class); 42 43 //指定要處理的輸入數據存放路徑 44 //FileInputFormat是所有以文件作為數據源的InputFormat實現的基類, 45 //FileInputFormat保存作為job輸入的所有文件,并實現了對輸入文件計算splits的方法。 46 //至于獲得記錄的方法是有不同的子類——TextInputFormat進行實現的。 47 FileInputFormat.setInputPaths(job, new Path(args[0])); 48 49 //指定處理結果的輸出數據存放路徑 50 FileOutputFormat.setOutputPath(job, new Path(args[1])); 51 52 //將job提交給集群運行 53 //job.waitForCompletion(true); 54 //正常執行成功返回0,否則返回1 55 return job.waitForCompletion(true) ? 0 : 1; 56 } 57 58 public static void main(String[] args) throws Exception { 59 //規范性調用 60 int res = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); 61 //執行結束退出 62 System.exit(res); 63 } 64 65 }

然后打包上傳到虛擬機上面,還有模擬數據,過程省略,貼出模擬數據:

1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200

可以看到打的包和模擬數據已經上傳到虛擬機上:

然后將數據上傳到hdfs集群(這里是偽分布式集群)上面:

現在集群上面創建一個空白的文件夾flow,然后在文件夾里面創建一個data文件夾存放數據,最后將數據存放到data文件夾里面:

然后執行程序,由于是需要傳入參數的,所以注意最后兩個是參數:

?

然后就報了一個這樣子的錯,我也是一臉懵逼:

?Error: java.lang.ClassCastException: class com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider$Text
?? ?at java.lang.Class.asSubclass(Class.java:3165)
?? ?at org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:884)
?? ?at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:981)
?? ?at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:391)
?? ?at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:80)
?? ?at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:675)
?? ?at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
?? ?at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
?? ?at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
?? ?at java.security.AccessController.doPrivileged(Native Method)
?? ?at javax.security.auth.Subject.doAs(Subject.java:415)
?? ?at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
?? ?at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

?然后根據你現在學的知識肯定已經被別人學過的理論,and一定有好心的大神會貼出來錯誤的心態百度一下,然后解決問題:

原來是Text的包導錯了(還是小心點好。不然夠喝一壺的了)

不是:import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;

而是:import org.apache.hadoop.io.Text;

然后打包上傳到虛擬機上面運行,然后你會發現這個錯誤:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://master:9000/flow/output already exists
?? ?at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
?? ?at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
?? ?at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
?? ?at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
?? ?at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
?? ?at java.security.AccessController.doPrivileged(Native Method)
?? ?at javax.security.auth.Subject.doAs(Subject.java:415)
?? ?at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
?? ?at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
?? ?at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
?? ?at com.flowSum.FlowSumRunner.run(FlowSumRunner.java:55)
?? ?at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
?? ?at com.flowSum.FlowSumRunner.main(FlowSumRunner.java:60)
?? ?at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
?? ?at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
?? ?at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
?? ?at java.lang.reflect.Method.invoke(Method.java:606)
?? ?at org.apache.hadoop.util.RunJar.main(RunJar.java:212)

然后你把這個/flow/output的這個output文件夾刪除了,因為輸出文件夾是程序自動創建的:

?

最后運行程序(由于是需要傳入參數的,所以注意最后兩個是參數):

?然后就報數據越界的異常,我想可能是測試數據不干凈:

Error: java.lang.ArrayIndexOutOfBoundsException: 1
?? ?at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:29)
?? ?at com.flowSum.FlowSumMapper.map(FlowSumMapper.java:1)
?? ?at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
?? ?at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
?? ?at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
?? ?at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
?? ?at java.security.AccessController.doPrivileged(Native Method)
?? ?at javax.security.auth.Subject.doAs(Subject.java:415)
?? ?at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
?? ?at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162

然后手動造了一份數據,如下所示:

(好吧,后來測試上面的測試數據又可以運行了,總之多測試幾遍吧,都是坑!!!)

1363157985066 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 241 200 1363157985061 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985062 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 681 200 1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 4681 200 1363157985064 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 5481 4681 200 1363157985065 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 6481 2681 200 1363157985066 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 7481 2481 200 1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 8481 2461 200 1363157985067 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 281 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 2681 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 3481 24681 200 1363157985069 13726230509 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 4481 681 200 1363157985060 13726230500 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 24681 200 1363157985061 13726230501 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985066 13726230502 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 81 200 1363157985063 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985063 13726230504 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985064 13726230505 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 2681 200 1363157985065 13726230506 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 1481 681 200 1363157985066 13726230507 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 81 24681 200 1363157985067 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 241 200 1363157985068 13726230508 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 481 681 200 1363157985068 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 241 681 200

最后將String[] fields = StringUtils.split(line, "\t");修改為了27 String[] fields = StringUtils.split(line, " ");

(后來測試了一下,String[] fields = StringUtils.split(line, "\t");也可以,開始以為空格的大小也影響測試數據呢,代碼沒問題,就是測試數據的問題。

1 package com.flowSum; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.io.LongWritable; 7 import org.apache.hadoop.io.Text; 8 import org.apache.hadoop.mapreduce.Mapper; 9 /*** 10 * 11 * @author Administrator 12 * 1:FlowBean是我們自定義的一種數據類型,要在hadoop的各個節點之間進行傳輸,應該遵循hadoop的序列化 13 * 所以就必須實現hadoop的相應的序列化接口 14 * 2:Text一般認為它等價于java.lang.String的Writable。針對UTF-8序列。 15 */ 16 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ 17 18 //拿到日志中的一行數據,切分各個字段,抽取出我們需要的字段:手機號,上行流量,下行流量 19 //封裝成key-value發送出去 20 21 @Override 22 protected void map(LongWritable key, Text value, Context context) 23 throws IOException, InterruptedException { 24 //拿到一行數據 25 String line = value.toString(); 26 //切分成各個字段 27 String[] fields = StringUtils.split(line, " "); 28 //拿到手機號的字段 29 String phoneNumber = fields[1]; 30 //拿到上行流量字段 31 long up_flow = Long.parseLong(fields[7]); 32 //拿到下行流量字段 33 long down_flow = Long.parseLong(fields[8]); 34 35 //最后一步,封裝數據為key-value進行輸出 36 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 37 38 } 39 40 }

打包上傳到虛擬機上面,然后運行(正常運行結果如下所示):

?[root@master hadoop]# hadoop jar flow.jar com.flowSum.FlowSumRunner /flow/data /flow/output
17/09/20 09:35:26 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/20 09:35:26 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/20 09:35:27 INFO input.FileInputFormat: Total input paths to process : 1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: number of splits:1
17/09/20 09:35:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505814887677_0007
17/09/20 09:35:27 INFO impl.YarnClientImpl: Submitted application application_1505814887677_0007
17/09/20 09:35:27 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505814887677_0007/
17/09/20 09:35:27 INFO mapreduce.Job: Running job: job_1505814887677_0007
17/09/20 09:35:33 INFO mapreduce.Job: Job job_1505814887677_0007 running in uber mode : false
17/09/20 09:35:33 INFO mapreduce.Job:? map 0% reduce 0%
17/09/20 09:35:37 INFO mapreduce.Job:? map 100% reduce 0%
17/09/20 09:35:43 INFO mapreduce.Job:? map 100% reduce 100%
17/09/20 09:35:43 INFO mapreduce.Job: Job job_1505814887677_0007 completed successfully
17/09/20 09:35:43 INFO mapreduce.Job: Counters: 49
?? ?File System Counters
?? ??? ?FILE: Number of bytes read=1179
?? ??? ?FILE: Number of bytes written=187971
?? ??? ?FILE: Number of read operations=0
?? ??? ?FILE: Number of large read operations=0
?? ??? ?FILE: Number of write operations=0
?? ??? ?HDFS: Number of bytes read=2467
?? ??? ?HDFS: Number of bytes written=279
?? ??? ?HDFS: Number of read operations=6
?? ??? ?HDFS: Number of large read operations=0
?? ??? ?HDFS: Number of write operations=2
?? ?Job Counters
?? ??? ?Launched map tasks=1
?? ??? ?Launched reduce tasks=1
?? ??? ?Data-local map tasks=1
?? ??? ?Total time spent by all maps in occupied slots (ms)=2691
?? ??? ?Total time spent by all reduces in occupied slots (ms)=2582
?? ??? ?Total time spent by all map tasks (ms)=2691
?? ??? ?Total time spent by all reduce tasks (ms)=2582
?? ??? ?Total vcore-seconds taken by all map tasks=2691
?? ??? ?Total vcore-seconds taken by all reduce tasks=2582
?? ??? ?Total megabyte-seconds taken by all map tasks=2755584
?? ??? ?Total megabyte-seconds taken by all reduce tasks=2643968
?? ?Map-Reduce Framework
?? ??? ?Map input records=23
?? ??? ?Map output records=23
?? ??? ?Map output bytes=1127
?? ??? ?Map output materialized bytes=1179
?? ??? ?Input split bytes=93
?? ??? ?Combine input records=0
?? ??? ?Combine output records=0
?? ??? ?Reduce input groups=10
?? ??? ?Reduce shuffle bytes=1179
?? ??? ?Reduce input records=23
?? ??? ?Reduce output records=10
?? ??? ?Spilled Records=46
?? ??? ?Shuffled Maps =1
?? ??? ?Failed Shuffles=0
?? ??? ?Merged Map outputs=1
?? ??? ?GC time elapsed (ms)=126
?? ??? ?CPU time spent (ms)=1240
?? ??? ?Physical memory (bytes) snapshot=218099712
?? ??? ?Virtual memory (bytes) snapshot=726839296
?? ??? ?Total committed heap usage (bytes)=137433088
?? ?Shuffle Errors
?? ??? ?BAD_ID=0
?? ??? ?CONNECTION=0
?? ??? ?IO_ERROR=0
?? ??? ?WRONG_LENGTH=0
?? ??? ?WRONG_MAP=0
?? ??? ?WRONG_REDUCE=0
?? ?File Input Format Counters
?? ??? ?Bytes Read=2374
?? ?File Output Format Counters
?? ??? ?Bytes Written=279
[root@master hadoop]#

?查看輸出結果如下所示:

?

總之吧,學習新知識,難免各種錯誤,靜下心去解決吧。


?2:流量求和統計排序案例實踐:

?將Mapper類和Reducer類都寫成靜態內部類(又遇到上面比較騷氣的問題了String[] fields = StringUtils.split(line, "\t");就是跑步起來,各種報數組越界異常,郁悶,換成了String[] fields = StringUtils.split(line, " ");就跑起來了,真是一臉懵逼);

1 package com.flowSort; 2 3 import java.io.IOException; 4 5 import org.apache.commons.lang.StringUtils; 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.NullWritable; 10 import org.apache.hadoop.io.Text; 11 import org.apache.hadoop.mapreduce.Job; 12 import org.apache.hadoop.mapreduce.Mapper; 13 import org.apache.hadoop.mapreduce.Reducer; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 17 18 public class FlowSortMapReduce { 19 20 /*** 21 * mapper靜態內部類 22 * @author Administrator 23 * 24 */ 25 public static class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ 26 27 //拿到一行數據,切分出各字段,封裝為一個flowbean,作為key輸出 28 @Override 29 protected void map(LongWritable key, Text value,Context context) 30 throws IOException, InterruptedException { 31 //獲取到一行數據 32 String line = value.toString(); 33 //對這一行數據進行截取 34 String[] fields = StringUtils.split(line, ""); 35 36 //獲取數據里面的數據 37 String phoneNumber = fields[0]; 38 long up_flow = Long.parseLong(fields[1]); 39 long down_flow = Long.parseLong(fields[2]); 40 41 //將數據進行封裝傳遞給reduce 42 context.write(new FlowBean(phoneNumber, up_flow, down_flow), NullWritable.get()); 43 } 44 45 } 46 47 /*** 48 * reducer的靜態內部類 49 * @author Administrator 50 * 51 */ 52 public static class FlowSortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ 53 54 @Override 55 protected void reduce(FlowBean key, Iterable<NullWritable> values,Context context) 56 throws IOException, InterruptedException { 57 58 String phoneNumber = key.getPhoneNumber(); 59 context.write(new Text(phoneNumber), key); 60 } 61 } 62 63 64 /*** 65 * 主方法 66 * @param args 67 * @throws InterruptedException 68 * @throws IOException 69 * @throws ClassNotFoundException 70 */ 71 public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException { 72 //創建配置文件 73 Configuration conf = new Configuration(); 74 //獲取一個作業 75 Job job = Job.getInstance(conf); 76 77 //設置整個job所用的那些類在哪個jar包 78 job.setJarByClass(FlowSortMapReduce.class); 79 80 //本job使用的mapper和reducer的類 81 job.setMapperClass(FlowSortMapper.class); 82 job.setReducerClass(FlowSortReducer.class); 83 84 //指定mapper的輸出數據key-value類型 85 job.setMapOutputKeyClass(FlowBean.class); 86 job.setMapOutputValueClass(NullWritable.class); 87 88 //指定reduce的輸出數據key-value類型Text 89 job.setOutputKeyClass(Text.class); 90 job.setOutputValueClass(FlowBean.class); 91 92 //指定要處理的輸入數據存放路徑 93 //FileInputFormat是所有以文件作為數據源的InputFormat實現的基類, 94 //FileInputFormat保存作為job輸入的所有文件,并實現了對輸入文件計算splits的方法。 95 //至于獲得記錄的方法是有不同的子類——TextInputFormat進行實現的。 96 FileInputFormat.setInputPaths(job, new Path(args[0])); 97 98 //指定處理結果的輸出數據存放路徑 99 FileOutputFormat.setOutputPath(job, new Path(args[1])); 100 101 //將job提交給集群運行 102 //job.waitForCompletion(true); 103 //正常執行成功返回0,否則返回1 104 System.exit(job.waitForCompletion(true) ? 0 : 1); 105 } 106 107 }

?實體類改造,進行總流量排序處理:

1 package com.flowSort; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 7 import org.apache.hadoop.io.Writable; 8 import org.apache.hadoop.io.WritableComparable; 9 10 /*** 11 * 12 * @author Administrator 13 * 1:write 是把每個對象序列化到輸出流 14 * 2:readFields是把輸入流字節反序列化 15 * 3:實現WritableComparable 16 * Java值對象的比較:一般需要重寫toString(),hashCode(),equals()方法 17 * 18 */ 19 public class FlowBean implements WritableComparable<FlowBean>{ 20 21 22 private String phoneNumber;//電話號碼 23 private long upFlow;//上行流量 24 private long downFlow;//下行流量 25 private long sumFlow;//總流量 26 27 28 29 public String getPhoneNumber() { 30 return phoneNumber; 31 } 32 public void setPhoneNumber(String phoneNumber) { 33 this.phoneNumber = phoneNumber; 34 } 35 public long getUpFlow() { 36 return upFlow; 37 } 38 public void setUpFlow(long upFlow) { 39 this.upFlow = upFlow; 40 } 41 public long getDownFlow() { 42 return downFlow; 43 } 44 public void setDownFlow(long downFlow) { 45 this.downFlow = downFlow; 46 } 47 public long getSumFlow() { 48 return sumFlow; 49 } 50 public void setSumFlow(long sumFlow) { 51 this.sumFlow = sumFlow; 52 } 53 54 //為了對象數據的初始化方便,加入一個帶參的構造函數 55 public FlowBean(String phoneNumber, long upFlow, long downFlow) { 56 this.phoneNumber = phoneNumber; 57 this.upFlow = upFlow; 58 this.downFlow = downFlow; 59 this.sumFlow = upFlow + downFlow; 60 } 61 //在反序列化時候,反射機制需要調用空參的構造函數,所以定義了一個空參的構造函數 62 public FlowBean() { 63 } 64 65 //重寫toString()方法 66 @Override 67 public String toString() { 68 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 69 } 70 71 72 //從數據流中反序列出對象的數據 73 //從數據流中讀取字段時必須和序列化的順序保持一致 74 @Override 75 public void readFields(DataInput in) throws IOException { 76 phoneNumber = in.readUTF(); 77 upFlow = in.readLong(); 78 downFlow = in.readLong(); 79 sumFlow = in.readLong(); 80 81 } 82 83 //將對象數據序列化到流中 84 @Override 85 public void write(DataOutput out) throws IOException { 86 out.writeUTF(phoneNumber); 87 out.writeLong(upFlow); 88 out.writeLong(downFlow); 89 out.writeLong(sumFlow); 90 91 } 92 93 //流量比較的實現方法 94 @Override 95 public int compareTo(FlowBean o) { 96 97 //大就返回-1,小于等于返回1,進行倒序排序 98 return sumFlow > o.sumFlow ? -1 : 1; 99 } 100 101 102 103 }

?效果就是這樣,總之問題不斷:

[root@master hadoop]# hadoop jar flowsort.jar com.flowSort.FlowSortMapReduce /flow/output4 /flow/sortoutput
17/09/21 19:32:28 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032
17/09/21 19:32:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/09/21 19:32:29 INFO input.FileInputFormat: Total input paths to process : 1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: number of splits:1
17/09/21 19:32:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1505991512603_0004
17/09/21 19:32:29 INFO impl.YarnClientImpl: Submitted application application_1505991512603_0004
17/09/21 19:32:29 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1505991512603_0004/
17/09/21 19:32:29 INFO mapreduce.Job: Running job: job_1505991512603_0004
17/09/21 19:32:33 INFO mapreduce.Job: Job job_1505991512603_0004 running in uber mode : false
17/09/21 19:32:33 INFO mapreduce.Job:? map 0% reduce 0%
17/09/21 19:32:38 INFO mapreduce.Job:? map 100% reduce 0%
17/09/21 19:32:44 INFO mapreduce.Job:? map 100% reduce 100%
17/09/21 19:32:44 INFO mapreduce.Job: Job job_1505991512603_0004 completed successfully
17/09/21 19:32:44 INFO mapreduce.Job: Counters: 49
?? ?File System Counters
?? ??? ?FILE: Number of bytes read=822
?? ??? ?FILE: Number of bytes written=187379
?? ??? ?FILE: Number of read operations=0
?? ??? ?FILE: Number of large read operations=0
?? ??? ?FILE: Number of write operations=0
?? ??? ?HDFS: Number of bytes read=635
?? ??? ?HDFS: Number of bytes written=526
?? ??? ?HDFS: Number of read operations=6
?? ??? ?HDFS: Number of large read operations=0
?? ??? ?HDFS: Number of write operations=2
?? ?Job Counters
?? ??? ?Launched map tasks=1
?? ??? ?Launched reduce tasks=1
?? ??? ?Data-local map tasks=1
?? ??? ?Total time spent by all maps in occupied slots (ms)=2031
?? ??? ?Total time spent by all reduces in occupied slots (ms)=2599
?? ??? ?Total time spent by all map tasks (ms)=2031
?? ??? ?Total time spent by all reduce tasks (ms)=2599
?? ??? ?Total vcore-seconds taken by all map tasks=2031
?? ??? ?Total vcore-seconds taken by all reduce tasks=2599
?? ??? ?Total megabyte-seconds taken by all map tasks=2079744
?? ??? ?Total megabyte-seconds taken by all reduce tasks=2661376
?? ?Map-Reduce Framework
?? ??? ?Map input records=21
?? ??? ?Map output records=21
?? ??? ?Map output bytes=774
?? ??? ?Map output materialized bytes=822
?? ??? ?Input split bytes=109
?? ??? ?Combine input records=0
?? ??? ?Combine output records=0
?? ??? ?Reduce input groups=21
?? ??? ?Reduce shuffle bytes=822
?? ??? ?Reduce input records=21
?? ??? ?Reduce output records=21
?? ??? ?Spilled Records=42
?? ??? ?Shuffled Maps =1
?? ??? ?Failed Shuffles=0
?? ??? ?Merged Map outputs=1
?? ??? ?GC time elapsed (ms)=121
?? ??? ?CPU time spent (ms)=700
?? ??? ?Physical memory (bytes) snapshot=218284032
?? ??? ?Virtual memory (bytes) snapshot=726839296
?? ??? ?Total committed heap usage (bytes)=137433088
?? ?Shuffle Errors
?? ??? ?BAD_ID=0
?? ??? ?CONNECTION=0
?? ??? ?IO_ERROR=0
?? ??? ?WRONG_LENGTH=0
?? ??? ?WRONG_MAP=0
?? ??? ?WRONG_REDUCE=0
?? ?File Input Format Counters
?? ??? ?Bytes Read=526
?? ?File Output Format Counters
?? ??? ?Bytes Written=526
[root@master hadoop]# hadoop fs -ls /flow/sortoutput
Found 2 items
-rw-r--r--?? 1 root supergroup????????? 0 2017-09-21 19:32 /flow/sortoutput/_SUCCESS
-rw-r--r--?? 1 root supergroup??????? 526 2017-09-21 19:32 /flow/sortoutput/part-r-00000
[root@master hadoop]# hadoop fs -cat /flow/sortoutput/part-r-00000
13726238888?? ?2481?? ?24681?? ?27162
13726230503?? ?2481?? ?24681?? ?27162
13925057413?? ?63?? ?11058?? ?11121
18320173382?? ?18?? ?9531?? ?9549
13502468823?? ?102?? ?7335?? ?7437
13660577991?? ?9?? ?6960?? ?6969
13922314466?? ?3008?? ?3720?? ?6728
13560439658?? ?5892?? ?400?? ?6292
84138413?? ?4116?? ?1432?? ?5548
15013685858?? ?27?? ?3659?? ?3686
15920133257?? ?20?? ?3156?? ?3176
13602846565?? ?12?? ?1938?? ?1950
15989002119?? ?3?? ?1938?? ?1941
13926435656?? ?1512?? ?200?? ?1712
18211575961?? ?12?? ?1527?? ?1539
13560436666?? ?954?? ?200?? ?1154
13480253104?? ?180?? ?200?? ?380
13760778710?? ?120?? ?200?? ?320
13826544101?? ?0?? ?200?? ?200
13926251106?? ?0?? ?200?? ?200
13719199419?? ?0?? ?200?? ?200
[root@master hadoop]#

?

《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀

總結

以上是生活随笔為你收集整理的一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序...的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。