日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

清洗弹幕数据,去不相关的列和空值,MapReduce

發布時間:2024/2/28 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 清洗弹幕数据,去不相关的列和空值,MapReduce 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

原始數據:

話不多說,直接上代碼!

老樣子先pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.henu</groupId><artifactId>ETL</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-resourcemanager</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-nodemanager</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-shuffle</artifactId><version>2.6.0</version></dependency></dependencies></project>

ETLUtil

package com.wc;/*** @author George* @description etl工具類**/ public class ETLUtil {public static String oriString2ETLString(String ori){StringBuilder etlString = new StringBuilder();if (ori.startsWith("0")) {String[] splits = ori.split("\t");for (String split : splits) {if (!"25".equals(split) && !"".equals(split)) {etlString.append(split + "#");}}}return etlString.toString();} /*public static void main(String[] args) throws IOException {BufferedReader br = new BufferedReader(new FileReader("./data/test"));String str = "";while ((str = br.readLine())!=null){String string = oriString2ETLString(str);System.out.println(string);}br.close();}*/ }

BSETLMapper

package com.wc;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author George* @description map階段**/ public class BSETLMapper extends Mapper<Object, Text, NullWritable,Text> {Text text = new Text();@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String etlString = ETLUtil.oriString2ETLString(value.toString());//檢查字符串是否為空白、空("")或nullif (StringUtils.isBlank(etlString))return;text.set(etlString);context.write(NullWritable.get(),text);} }

BSETLRunner

?

package com.wc; import com.AccountRegisterETL.AccountRegisterETLMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** @author George* @description**/ public class BSETLRunner implements Tool {private Configuration conf = null;public void setConf(Configuration conf) {this.conf = conf;}public Configuration getConf() {return this.conf;}public int run(String[] args) throws Exception {conf = this.getConf();conf.set("inpath", args[0]);conf.set("outpath", args[1]);Job job = Job.getInstance(conf);job.setJarByClass(BSETLRunner.class);job.setMapperClass(BSETLMapper.class);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(Text.class);job.setNumReduceTasks(0);this.initJobInputPath(job);this.initJobOutputPath(job);return job.waitForCompletion(true) ? 0 : 1;}private void initJobOutputPath(Job job) throws IOException {Configuration conf = job.getConfiguration();String outPathString = conf.get("outpath");FileSystem fs = FileSystem.get(conf);Path outPath = new Path(outPathString);if(fs.exists(outPath)){fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);}private void initJobInputPath(Job job) throws IOException {Configuration conf = job.getConfiguration();String inPathString = conf.get("inpath");FileSystem fs = FileSystem.get(conf);Path inPath = new Path(inPathString);if(fs.exists(inPath)){FileInputFormat.addInputPath(job, inPath);}else{throw new RuntimeException("HDFS中該文件目錄不存在:" + inPathString);}}public static void main(String[] args) {try {int resultCode = ToolRunner.run(new BSETLRunner(), args);if(resultCode == 0){System.out.println("Success!");}else{System.out.println("Fail!");}System.exit(resultCode);} catch (Exception e) {e.printStackTrace();System.exit(1);}} }

啟動集群!!!?

上傳jar包,

上傳數據:

[root@henu2 ~]# hdfs dfs -put data.txt /

運行jar包:?

[root@henu2 ~]# hdfs dfs -mkdir /out [root@henu2 ~]# yarn jar ETL-1.0-SNAPSHOT.jar com.wc.BSETLRunner /data.txt /out/

得到結果數據:

結果展示:

?

總結

以上是生活随笔為你收集整理的清洗弹幕数据,去不相关的列和空值,MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。