日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MapReduce入门2-流量监控

發布時間:2023/12/13 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MapReduce入门2-流量监控 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

3、流量監控匯總(使用LongWritable實現)

hdfs文件路徑:/tmp/flow.txt 查看文件內容: 13770759991 50 100 25 400 13770759991 800 600 500 100 13770759992 400 300 250 1400 13770759992 800 1200 600 900

字符串含義: 號碼 上行 下行 上傳 下載 phoneNum uppackBytes downpackBytes uploadBytes downloadBytes  

代碼:

import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();; job.setJarByClass(FlowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}/* 號碼 上行 下行 上傳 下載 phoneNum uppackBytes downpackBytes uploadBytes downloadBytes 13770759991 50L 100L 25L 400L 13770759991 800L 600L 500L 100L 13770759992 400L 300L 250L 1400L 13770759992 800L 1200L 600L 900L */ class FlowMapper extends Mapper<LongWritable,Text,Text,Text>{@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\W+"); String phoneNum = line[0];long uppackBytes = Long.parseLong(line[1]);long downpackBytes = Long.parseLong(line[2]);long uploadBytes = Long.parseLong(line[3]);long downloadBytes = Long.parseLong(line[4]);context.write(new Text(phoneNum), new Text(uppackBytes+"-"+downpackBytes+"-"+uploadBytes+"-"+downloadBytes));}}class FlowReducer extends Reducer<Text,Text,Text,Text>{@Overrideprotected void reduce(Text phoneNum, Iterable<Text> text, Reducer<Text, Text, Text, Text>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong sumUppack = 0L;long sumDownpack = 0L;long sumUpload = 0L;long sumDownload = 0L;for(Text t : text){String[] line = t.toString().split("-");sumUppack += Long.parseLong(line[0].toString());sumDownpack += Long.parseLong(line[1].toString());sumUpload += Long.parseLong(line[2].toString());sumDownload += Long.parseLong(line[3].toString());}context.write(phoneNum,new Text(sumUppack+"-"+sumDownpack+"-"+sumUpload+"-"+sumDownload) );}}

輸出:

導出成flow.jar并上傳至服務器的/opt目錄 執行: hadoop jar flow.jar "FlowTest" "/tmp/flow.txt" "/tmp/flow/out"再執行: hadoop fs -ls /tmp/flow/out/* 查看輸出的文件:

?

?

4、流量監控匯總(使用自定義的writable類NetflowWritable實現)

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NetflowTest {public static void main(String[] args) {// TODO Auto-generated method stubPath fromPath = new Path(args[0]);Path toPath = new Path(args[1]);try {Configuration conf = new Configuration();Job job = Job.getInstance();job.setJarByClass(NetflowTest.class);FileInputFormat.addInputPath(job, fromPath);FileOutputFormat.setOutputPath(job, toPath);job.setMapperClass(NetflowMapper.class);job.setReducerClass(NetflowReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NetflowWritable.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(NetflowWritable.class);try {job.waitForCompletion(true);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}class NetflowWritable implements Writable{private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;//創建一個無參的構造方法,不加的話會執行報錯public NetflowWritable(){}public NetflowWritable(long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {//this.phoneNum=phoneNum;this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes; this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}public long getUppackBytes() {return uppackBytes;}public long getDownpackBytes() {return downpackBytes;}public long getUploadBytes() {return uploadBytes;}public long getDownloadBytes() {return downloadBytes;}public void set( long uppackBytes,long downpackBytes,long uploadBytes,long downloadBytes) {this.uppackBytes = uppackBytes;this.downpackBytes = downpackBytes; this.uploadBytes = uploadBytes;this.downloadBytes = downloadBytes;}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.uppackBytes = in.readLong();this.downpackBytes = in.readLong();this.uploadBytes = in.readLong();this.downloadBytes = in.readLong();}@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(uppackBytes);out.writeLong(downpackBytes);out.writeLong(uploadBytes);out.writeLong(downloadBytes);}@Override//重寫toString方法public String toString() {// TODO Auto-generated method stubreturn "NetflowWritable [uppackBytes="+uppackBytes+",downpackBytes="+downpackBytes+",uploadBytes="+uploadBytes+",downloadBytes="+downloadBytes+"]" ;} }class NetflowMapper extends Mapper<LongWritable,Text,Text,NetflowWritable>{private String phoneNum;private long uppackBytes;private long downpackBytes;private long uploadBytes;private long downloadBytes;NetflowWritable nf = new NetflowWritable();//Text text = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stubString[] line = value.toString().split("\\t");phoneNum = line[0];uppackBytes = Long.parseLong(line[1]);downpackBytes = Long.parseLong(line[2]);uploadBytes = Long.parseLong(line[3]);downloadBytes = Long.parseLong(line[4]);nf.set( uppackBytes, downpackBytes, uploadBytes, downloadBytes);context.write(new Text(phoneNum), nf);}}class NetflowReducer extends Reducer<Text,NetflowWritable,Text,NetflowWritable>{private NetflowWritable nf;@Overrideprotected void reduce(Text arg0, Iterable<NetflowWritable> arg1,Reducer<Text, NetflowWritable, Text, NetflowWritable>.Context context)throws IOException, InterruptedException {// TODO Auto-generated method stublong uppackBytes = 0L;long downpackBytes = 0L;long uploadBytes = 0L;long downloadBytes = 0L;for(NetflowWritable nw : arg1){uppackBytes += nw.getUppackBytes();downpackBytes += nw.getDownpackBytes();uploadBytes += nw.getUploadBytes();downloadBytes += nw.getDownloadBytes();}nf = new NetflowWritable(uppackBytes,downpackBytes,uploadBytes,downloadBytes);context.write(arg0, nf);}}

  

輸出:

導出成netflow.jar并上傳至服務器的/opt目錄 執行: hadoop jar netflow.jar "NetflowTest" "/tmp/flow.txt" "/tmp/netflow/out"再執行: hadoop fs -ls /tmp/netflow/out/* 查看輸出的文件:

?


?

轉載于:https://www.cnblogs.com/cangos/p/6422144.html

總結

以上是生活随笔為你收集整理的MapReduce入门2-流量监控的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。