日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

用MapReduce实现矩阵乘法

發(fā)布時間:2025/6/15 编程问答 26 豆豆
生活随笔 收集整理的這篇文章主要介紹了 用MapReduce实现矩阵乘法 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

主要介紹Hadoop家族產(chǎn)品,常用的項目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的項目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

從2011年開始,中國進(jìn)入大數(shù)據(jù)風(fēng)起云涌的時代,以Hadoop為代表的家族軟件,占據(jù)了大數(shù)據(jù)處理的廣闊地盤。開源界及廠商,所有數(shù)據(jù)軟件,無一不向Hadoop靠攏。Hadoop也從小眾的高富帥領(lǐng)域,變成了大數(shù)據(jù)開發(fā)的標(biāo)準(zhǔn)。在Hadoop原有技術(shù)基礎(chǔ)之上,出現(xiàn)了Hadoop家族產(chǎn)品,通過“大數(shù)據(jù)”概念不斷創(chuàng)新,推出科技進(jìn)步。

作為IT界的開發(fā)人員,我們也要跟上節(jié)奏,抓住機(jī)遇,跟著Hadoop一起雄起!

關(guān)于作者:

  • 張丹(Conan), 程序員Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog:?http://blog.fens.me
  • email: bsspirit@gmail.com

轉(zhuǎn)載請注明出處:
http://blog.fens.me/hadoop-mapreduce-matrix/

前言

MapReduce打開了并行計算的大門,讓我們個人開發(fā)者有了處理大數(shù)據(jù)的能力。但想用好MapReduce,把原來單機(jī)算法并行化,也不是一件容易事情。很多的時候,我們需要從單機(jī)算法能否矩陣化去思考,所以矩陣操作就變成了算法并行化的基礎(chǔ)。

像推薦系統(tǒng)的協(xié)同過濾算法,就是基于矩陣思想實現(xiàn)MapReduce并行化。

目錄

  • 矩陣介紹
  • 矩陣乘法的R語言計算
  • 矩陣乘法的MapReduce計算
  • 稀疏矩陣乘法的MapReduce計算
  • 1. 矩陣介紹

    矩陣: 數(shù)學(xué)上,一個m×n的矩陣是一個由m行n列元素排列成的矩形陣列。矩陣?yán)锏脑乜梢允菙?shù)字、符號或數(shù)學(xué)式。以下是一個由6個數(shù)字符素構(gòu)成的2行3列的矩陣:

    1 2 3 4 5 6

    矩陣加法
    大小相同(行數(shù)列數(shù)都相同)的矩陣之間可以相互加減,具體是對每個位置上的元素做加減法。

    舉例:兩個矩陣的加法

    1 3 1 + 0 0 5 = 1+0 3+0 1+5 = 1 3 6 1 0 0 7 5 0 1+7 0+5 0+0 8 5 0

    矩陣乘法
    兩個矩陣可以相乘,當(dāng)且僅當(dāng)?shù)谝粋€矩陣的列數(shù)等于第二個矩陣的行數(shù)。矩陣的乘法滿足結(jié)合律和分配律,但不滿足交換律。

    舉例:兩個矩陣的乘法

    1 0 2 * 3 1 = (1*3+0*2+2*1) (1*1+0*1+2*0) = 5 1 -1 3 1 2 1 (-1*3+3*2+1*1) (-1*1+3*1+1*0) 4 21 0

    2. 矩陣乘法的R語言計算

    > m1<-matrix(c(1,0,2,-1,3,1),nrow=2,byrow=TRUE);m1[,1] [,2] [,3] [1,] 1 0 2 [2,] -1 3 1> m2<-matrix(c(3,1,2,1,1,0),nrow=3,byrow=TRUE);m2[,1] [,2] [1,] 3 1 [2,] 2 1 [3,] 1 0> m3<-m1 %*% m2;m3[,1] [,2] [1,] 5 1 [2,] 4 2

    由R語言實現(xiàn)矩陣的乘法是非常簡單的。

    3. 矩陣乘法的MapReduce計算

    算法實現(xiàn)思路:

    • 新建2個矩陣數(shù)據(jù)文件:m1.csv, m2.csv
    • 新建啟動程序:MainRun.java
    • 新建MR程序:MartrixMultiply.java

    1).新建2個矩陣數(shù)據(jù)文件m1.csv, m2.csv

    m1.csv

    1,0,2 -1,3,1

    m2.csv

    3,1 2,1 1,0

    3).新建啟動程序:MainRun.java

    啟動程序

    package org.conan.myhadoop.matrix;import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern;import org.apache.hadoop.mapred.JobConf;public class MainRun {public static final String HDFS = "hdfs://192.168.1.210:9000";public static final Pattern DELIMITER = Pattern.compile("[\t,]");public static void main(String[] args) {martrixMultiply();}public static void martrixMultiply() {Map<String, String> path = new HashMap<String, String>();path.put("m1", "logfile/matrix/m1.csv");// 本地的數(shù)據(jù)文件path.put("m2", "logfile/matrix/m2.csv");path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目錄path.put("input1", HDFS + "/user/hdfs/matrix/m1");path.put("input2", HDFS + "/user/hdfs/matrix/m2");path.put("output", HDFS + "/user/hdfs/matrix/output");try {MartrixMultiply.run(path);// 啟動程序} catch (Exception e) {e.printStackTrace();}System.exit(0);}public static JobConf config() {// Hadoop集群的遠(yuǎn)程配置信息JobConf conf = new JobConf(MainRun.class);conf.setJobName("MartrixMultiply");conf.addResource("classpath:/hadoop/core-site.xml");conf.addResource("classpath:/hadoop/hdfs-site.xml");conf.addResource("classpath:/hadoop/mapred-site.xml");return conf;}}

    3).新建MR程序:MartrixMultiply.java

    MapReduce程序

    package org.conan.myhadoop.matrix;import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO;public class MartrixMultiply {public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {private String flag;// m1 or m2private int rowNum = 2;// 矩陣A的行數(shù)private int colNum = 2;// 矩陣B的列數(shù)private int rowIndexA = 1; // 矩陣A,當(dāng)前在第幾行private int rowIndexB = 1; // 矩陣B,當(dāng)前在第幾行@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit();flag = split.getPath().getName();// 判斷讀的數(shù)據(jù)集}@Overridepublic void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {String[] tokens = MainRun.DELIMITER.split(values.toString());if (flag.equals("m1")) {for (int i = 1; i <= rowNum; i++) {Text k = new Text(rowIndexA + "," + i);for (int j = 1; j <= tokens.length; j++) {Text v = new Text("A:" + j + "," + tokens[j - 1]);context.write(k, v);System.out.println(k.toString() + " " + v.toString());}}rowIndexA++;} else if (flag.equals("m2")) {for (int i = 1; i <= tokens.length; i++) {for (int j = 1; j <= colNum; j++) {Text k = new Text(i + "," + j);Text v = new Text("B:" + rowIndexB + "," + tokens[j - 1]);context.write(k, v);System.out.println(k.toString() + " " + v.toString());}}rowIndexB++;}}}public static class MatrixReducer extends Reducer<Text, Text, Text, IntWritable> {@Overridepublic void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {Map<String, String> mapA = new HashMap<String, String>();Map<String, String> mapB = new HashMap<String, String>();System.out.print(key.toString() + ":");for (Text line : values) {String val = line.toString();System.out.print("("+val+")");if (val.startsWith("A:")) {String[] kv = MainRun.DELIMITER.split(val.substring(2));mapA.put(kv[0], kv[1]);// System.out.println("A:" + kv[0] + "," + kv[1]);} else if (val.startsWith("B:")) {String[] kv = MainRun.DELIMITER.split(val.substring(2));mapB.put(kv[0], kv[1]);// System.out.println("B:" + kv[0] + "," + kv[1]);}}int result = 0;Iterator<String> iter = mapA.keySet().iterator();while (iter.hasNext()) {String mapk = iter.next();result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(mapB.get(mapk));}context.write(key, new IntWritable(result));System.out.println();// System.out.println("C:" + key.toString() + "," + result);}}public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {JobConf conf = MainRun.config();String input = path.get("input");String input1 = path.get("input1");String input2 = path.get("input2");String output = path.get("output");HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);hdfs.rmr(input);hdfs.mkdirs(input);hdfs.copyFile(path.get("m1"), input1);hdfs.copyFile(path.get("m2"), input2);Job job = new Job(conf);job.setJarByClass(MartrixMultiply.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(MatrixMapper.class);job.setReducerClass(MatrixReducer.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加載2個輸入數(shù)據(jù)集FileOutputFormat.setOutputPath(job, new Path(output));job.waitForCompletion(true);}}

    運行日志

    Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix Create: hdfs://192.168.1.210:9000/user/hdfs/matrix copy from: logfile/matrix/m1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1 copy from: logfile/matrix/m2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2 2014-1-15 10:48:03 org.apache.hadoop.util.NativeCodeLoader <clinit> 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 2014-1-15 10:48:03 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus 信息: Total input paths to process : 2 2014-1-15 10:48:03 org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit> 警告: Snappy native library not loaded 2014-1-15 10:48:04 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Running job: job_local_0001 2014-1-15 10:48:04 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 1,1 A:1,1 1,1 A:2,0 1,1 A:3,2 1,2 A:1,1 1,2 A:2,0 1,2 A:3,2 2,1 A:1,-1 2,1 A:2,3 2,1 A:3,1 2,2 A:1,-1 2,2 A:2,3 2,2 A:3,1 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 2014-1-15 10:48:04 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 2014-1-15 10:48:05 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 0% reduce 0% 2014-1-15 10:48:07 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 10:48:07 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000000_0' done. 2014-1-15 10:48:07 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: io.sort.mb = 100 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: data buffer = 79691776/99614720 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> 信息: record buffer = 262144/327680 1,1 B:1,3 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 1,2 B:1,1 2,1 B:1,3 2,2 B:1,1 1,1 B:2,2 1,2 B:2,1 2,1 B:2,2 2,2 B:2,1 1,1 B:3,1 1,2 B:3,0 2,1 B:3,1 2,2 B:3,0 2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 2014-1-15 10:48:07 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 2014-1-15 10:48:08 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 0% 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000001_0' done. 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Merging 2 sorted segments 2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Down to the last merge-pass, with 2 segments left of total size: 294 bytes 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 1,1:(B:1,3)(B:2,2)(B:3,1)(A:1,1)(A:2,0)(A:3,2) 1,2:(A:1,1)(A:2,0)(A:3,2)(B:1,1)(B:2,1)(B:3,0) 2,1:(B:1,3)(B:2,2)(B:3,1)(A:1,-1)(A:2,3)(A:3,1) 2,2:(A:1,-1)(A:2,3)(A:3,1)(B:1,1)(B:2,1)(B:3,0) 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 10:48:10 org.apache.hadoop.mapred.Task commit 信息: Task attempt_local_0001_r_000000_0 is allowed to commit now 2014-1-15 10:48:10 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask 信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output 2014-1-15 10:48:13 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: reduce > reduce 2014-1-15 10:48:13 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_r_000000_0' done. 2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 100% 2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Job complete: job_local_0001 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Counters: 19 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: File Output Format Counters 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Bytes Written=24 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: FileSystemCounters 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_READ=1713 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: HDFS_BYTES_READ=75 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_WRITTEN=125314 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: HDFS_BYTES_WRITTEN=114 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: File Input Format Counters 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Bytes Read=30 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Map-Reduce Framework 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Map output materialized bytes=302 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Map input records=5 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Reduce shuffle bytes=0 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Spilled Records=48 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Map output bytes=242 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Total committed heap usage (bytes)=764215296 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: SPLIT_RAW_BYTES=220 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Combine input records=0 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Reduce input records=24 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Reduce input groups=4 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Combine output records=0 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Reduce output records=4 2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log 信息: Map output records=24

    4. 稀疏矩陣乘法的MapReduce計算

    我們在用矩陣處理真實數(shù)據(jù)的時候,一般都是非常稀疏矩陣,為了節(jié)省存儲空間,通常只會存儲非0的數(shù)據(jù)。

    下面我們來做一個稀疏矩陣:

    • R語言的實現(xiàn)矩陣乘法
    • 新建2個矩陣數(shù)據(jù)文件sm1.csv, sm2.csv
    • 修改啟動程序:MainRun.java
    • 新建MR程序:SparseMartrixMultiply.java

    1). R語言的實現(xiàn)矩陣乘法

    R語言程序

    > m1<-matrix(c(1,0,0,3,2,5,0,4,0,0,0,1,4,7,1,2),nrow=4,byrow=TRUE);m1[,1] [,2] [,3] [,4] [1,] 1 0 0 3 [2,] 2 5 0 4 [3,] 0 0 0 1 [4,] 4 7 1 2> m2<-matrix(c(5,0,0,2,0,0,3,1),nrow=4,byrow=TRUE);m2[,1] [,2] [1,] 5 0 [2,] 0 2 [3,] 0 0 [4,] 3 1> m3<-m1 %*% m2;m3[,1] [,2] [1,] 14 3 [2,] 22 14 [3,] 3 1 [4,] 26 16

    2).新建2個稀疏矩陣數(shù)據(jù)文件sm1.csv, sm2.csv

    只存儲非0的數(shù)據(jù),3列存儲,第一列“原矩陣行”,第二列“原矩陣列”,第三列“原矩陣值”。

    sm1.csv

    1,1,1 1,4,3 2,1,2 2,2,5 2,4,4 3,4,1 4,1,4 4,2,7 4,3,1 4,4,2

    sm2.csv

    1,1,5 2,2,2 4,1,3 4,2,1

    3).修改啟動程序:MainRun.java

    增加SparseMartrixMultiply的啟動配置

    public static void main(String[] args) {sparseMartrixMultiply();} public static void sparseMartrixMultiply() {Map<String, String> path = new HashMap<String, String>();path.put("m1", "logfile/matrix/sm1.csv");// 本地的數(shù)據(jù)文件path.put("m2", "logfile/matrix/sm2.csv");path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目錄path.put("input1", HDFS + "/user/hdfs/matrix/m1");path.put("input2", HDFS + "/user/hdfs/matrix/m2");path.put("output", HDFS + "/user/hdfs/matrix/output");try {SparseMartrixMultiply.run(path);// 啟動程序} catch (Exception e) {e.printStackTrace();}System.exit(0);}

    4). 新建MR程序:SparseMartrixMultiply.java

    • map函數(shù)有修改,reduce函數(shù)沒有變化
    • 去掉判斷所在行和列的變量
    package org.conan.myhadoop.matrix;import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO;public class SparseMartrixMultiply {public static class SparseMatrixMapper extends Mapper>LongWritable, Text, Text, Text< {private String flag;// m1 or m2private int rowNum = 4;// 矩陣A的行數(shù)private int colNum = 2;// 矩陣B的列數(shù)@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit();flag = split.getPath().getName();// 判斷讀的數(shù)據(jù)集}@Overridepublic void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {String[] tokens = MainRun.DELIMITER.split(values.toString());if (flag.equals("m1")) {String row = tokens[0];String col = tokens[1];String val = tokens[2];for (int i = 1; i >= colNum; i++) {Text k = new Text(row + "," + i);Text v = new Text("A:" + col + "," + val);context.write(k, v);System.out.println(k.toString() + " " + v.toString());}} else if (flag.equals("m2")) {String row = tokens[0];String col = tokens[1];String val = tokens[2];for (int i = 1; i >= rowNum; i++) {Text k = new Text(i + "," + col);Text v = new Text("B:" + row + "," + val);context.write(k, v);System.out.println(k.toString() + " " + v.toString());}}}}public static class SparseMatrixReducer extends Reducer>Text, Text, Text, IntWritable< {@Overridepublic void reduce(Text key, Iterable>Text< values, Context context) throws IOException, InterruptedException {Map>String, String< mapA = new HashMap>String, String<();Map>String, String< mapB = new HashMap>String, String<();System.out.print(key.toString() + ":");for (Text line : values) {String val = line.toString();System.out.print("(" + val + ")");if (val.startsWith("A:")) {String[] kv = MainRun.DELIMITER.split(val.substring(2));mapA.put(kv[0], kv[1]);// System.out.println("A:" + kv[0] + "," + kv[1]);} else if (val.startsWith("B:")) {String[] kv = MainRun.DELIMITER.split(val.substring(2));mapB.put(kv[0], kv[1]);// System.out.println("B:" + kv[0] + "," + kv[1]);}}int result = 0;Iterator>String< iter = mapA.keySet().iterator();while (iter.hasNext()) {String mapk = iter.next();String bVal = mapB.containsKey(mapk) ? mapB.get(mapk) : "0";result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(bVal);}context.write(key, new IntWritable(result));System.out.println();// System.out.println("C:" + key.toString() + "," + result);}}public static void run(Map>String, String< path) throws IOException, InterruptedException, ClassNotFoundException {JobConf conf = MainRun.config();String input = path.get("input");String input1 = path.get("input1");String input2 = path.get("input2");String output = path.get("output");HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);hdfs.rmr(input);hdfs.mkdirs(input);hdfs.copyFile(path.get("m1"), input1);hdfs.copyFile(path.get("m2"), input2);Job job = new Job(conf);job.setJarByClass(MartrixMultiply.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SparseMatrixMapper.class);job.setReducerClass(SparseMatrixReducer.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加載2個輸入數(shù)據(jù)集FileOutputFormat.setOutputPath(job, new Path(output));job.waitForCompletion(true);} }

    運行輸出:

    Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix Create: hdfs://192.168.1.210:9000/user/hdfs/matrix copy from: logfile/matrix/sm1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1 copy from: logfile/matrix/sm2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2 2014-1-15 11:57:31 org.apache.hadoop.util.NativeCodeLoader >clinit< 警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles 警告: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 2014-1-15 11:57:31 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus 信息: Total input paths to process : 2 2014-1-15 11:57:31 org.apache.hadoop.io.compress.snappy.LoadSnappy >clinit< 警告: Snappy native library not loaded 2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Running job: job_local_0001 2014-1-15 11:57:31 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init< 信息: io.sort.mb = 100 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init< 信息: data buffer = 79691776/99614720 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init< 信息: record buffer = 262144/327680 1,1 A:1,1 1,2 A:1,1 1,1 A:4,3 1,2 A:4,3 2,1 A:1,2 2,2 A:1,2 2,1 A:2,5 2,2 A:2,5 2,1 A:4,4 2,2 A:4,4 3,1 A:4,1 3,2 A:4,1 4,1 A:1,4 4,2 A:1,4 4,1 A:2,7 4,2 A:2,7 4,1 A:3,1 4,2 A:3,1 4,1 A:4,2 4,2 A:4,2 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 2014-1-15 11:57:31 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 2014-1-15 11:57:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 0% reduce 0% 2014-1-15 11:57:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 11:57:34 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000000_0' done. 2014-1-15 11:57:34 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init< 信息: io.sort.mb = 100 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init< 信息: data buffer = 79691776/99614720 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init< 信息: record buffer = 262144/327680 1,1 B:1,5 2,1 B:1,5 3,1 B:1,5 4,1 B:1,5 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush 信息: Starting flush of map output 1,2 B:2,2 2,2 B:2,2 3,2 B:2,2 4,2 B:2,2 1,1 B:4,3 2,1 B:4,3 3,1 B:4,3 4,1 B:4,3 1,2 B:4,1 2,2 B:4,1 3,2 B:4,1 4,2 B:4,1 2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill 信息: Finished spill 0 2014-1-15 11:57:34 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting 2014-1-15 11:57:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 0% 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_m_000001_0' done. 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task initialize 信息: Using ResourceCalculatorPlugin : null 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Merging 2 sorted segments 2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge 信息: Down to the last merge-pass, with 2 segments left of total size: 436 bytes 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 1,1:(B:1,5)(B:4,3)(A:1,1)(A:4,3) 1,2:(A:1,1)(A:4,3)(B:2,2)(B:4,1) 2,1:(B:1,5)(B:4,3)(A:1,2)(A:2,5)(A:4,4) 2,2:(A:1,2)(A:2,5)(A:4,4)(B:4,1)(B:2,2) 3,1:(B:1,5)(B:4,3)(A:4,1) 3,2:(A:4,1)(B:2,2)(B:4,1) 4,1:(B:4,3)(B:1,5)(A:1,4)(A:2,7)(A:3,1)(A:4,2) 4,2:(A:1,4)(A:2,7)(A:3,1)(A:4,2)(B:2,2)(B:4,1) 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task done 信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: 2014-1-15 11:57:37 org.apache.hadoop.mapred.Task commit 信息: Task attempt_local_0001_r_000000_0 is allowed to commit now 2014-1-15 11:57:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask 信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output 2014-1-15 11:57:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate 信息: reduce < reduce 2014-1-15 11:57:40 org.apache.hadoop.mapred.Task sendDone 信息: Task 'attempt_local_0001_r_000000_0' done. 2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: map 100% reduce 100% 2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob 信息: Job complete: job_local_0001 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Counters: 19 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: File Output Format Counters 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Bytes Written=53 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: FileSystemCounters 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_READ=2503 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: HDFS_BYTES_READ=266 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: FILE_BYTES_WRITTEN=126274 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: HDFS_BYTES_WRITTEN=347 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: File Input Format Counters 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Bytes Read=98 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Map-Reduce Framework 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Map output materialized bytes=444 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Map input records=14 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Reduce shuffle bytes=0 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Spilled Records=72 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Map output bytes=360 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Total committed heap usage (bytes)=764215296 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: SPLIT_RAW_BYTES=220 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Combine input records=0 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Reduce input records=36 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Reduce input groups=8 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Combine output records=0 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Reduce output records=8 2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log 信息: Map output records=36

    程序源代碼,已上傳到github:
    https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/matrix

    這樣就用MapReduce的程序,實現(xiàn)了矩陣的乘法!有了矩陣計算的基礎(chǔ),接下來,我們就可以做更多的事情了!

    總結(jié)

    以上是生活随笔為你收集整理的用MapReduce实现矩阵乘法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。