日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

PageRank算法并行实现

發布時間:2025/3/21 编程问答 22 豆豆
生活随笔 收集整理的這篇文章主要介紹了 PageRank算法并行实现 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

算法為王系列文章,涵蓋了計算機算法,數據挖掘(機器學習)算法,統計算法,金融算法等的多種跨學科算法組合。在大數據時代的背景下,算法已經成為了金字塔頂的明星。一個好的算法可以創造一個偉大帝國,就像Google。

算法為王的時代正式到來….

關于作者:

  • 張丹(Conan), 程序員Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog:?http://blog.fens.me
  • email: bsspirit@gmail.com

轉載請注明出處:
http://blog.fens.me/algorithm-pagerank-mapreduce/

前言

Google通過PageRank算法模型,實現了對全互聯網網頁的打分。但對于海量數據的處理,在單機下是不可能實現,所以如何將PageRank并行計算,將是本文的重點。

本文將繼續上一篇文章?PageRank算法R語言實現,把PageRank單機實現,改成并行實現,利用MapReduce計算框架,在集群中跑起來。

目錄

  • PageRank算法并行化原理
  • MapReduce分步式編程
  • 1. PageRank算法分步式原理

    單機算法原理請參考文章:PageRank算法R語言實現

    PageRank的分步式算法原理,簡單來講,就是通過矩陣計算實現并行化。

    1). 把鄰接矩陣的列,按數據行存儲

    鄰接矩陣

    [,1] [,2] [,3] [,4] [1,] 0.0375000 0.0375 0.0375 0.0375 [2,] 0.3208333 0.0375 0.0375 0.8875 [3,] 0.3208333 0.4625 0.0375 0.0375 [4,] 0.3208333 0.4625 0.8875 0.0375

    按行存儲HDFS

    1 0.037499994,0.32083333,0.32083333,0.32083333 2 0.037499994,0.037499994,0.4625,0.4625 3 0.037499994,0.037499994,0.037499994,0.88750005 4 0.037499994,0.88750005,0.037499994,0.037499994

    2). 迭代:求矩陣特征值

    map過程:

    • input: 鄰接矩陣, pr值
    • output: key為pr的行號,value為鄰接矩陣和pr值的乘法求和公式

    reduce過程:

    • input: key為pr的行號,value為鄰接矩陣和pr值的乘法求和公式
    • output: key為pr的行號, value為計算的結果,即pr值

    第1次迭代

    0.0375000 0.0375 0.0375 0.0375 1 0.150000 0.3208333 0.0375 0.0375 0.8875 * 1 = 1.283333 0.3208333 0.4625 0.0375 0.0375 1 0.858333 0.3208333 0.4625 0.8875 0.0375 1 1.708333

    第2次迭代

    0.0375000 0.0375 0.0375 0.0375 0.150000 0.150000 0.3208333 0.0375 0.0375 0.8875 * 1.283333 = 1.6445833 0.3208333 0.4625 0.0375 0.0375 0.858333 0.7379167 0.3208333 0.4625 0.8875 0.0375 1.708333 1.4675000

    … 10次迭代

    特征值

    0.1500000 1.4955721 0.8255034 1.5289245

    3). 標準化PR值

    0.150000 0.0375000 1.4955721 / (0.15+1.4955721+0.8255034+1.5289245) = 0.3738930 0.8255034 0.2063759 1.5289245 0.3822311

    2. MapReduce分步式編程

    MapReduce流程分解

    HDFS目錄

    • input(/user/hdfs/pagerank): HDFS的根目錄
    • input_pr(/user/hdfs/pagerank/pr): 臨時目錄,存儲中間結果PR值
    • tmp1(/user/hdfs/pagerank/tmp1):臨時目錄,存儲鄰接矩陣
    • tmp2(/user/hdfs/pagerank/tmp2):臨時目錄,迭代計算PR值,然后保存到input_pr
    • result(/user/hdfs/pagerank/result): PR值輸出結果

    開發步驟:

    • 網頁鏈接關系數據: page.csv
    • 出始的PR數據:pr.csv
    • 鄰接矩陣: AdjacencyMatrix.java
    • PageRank計算: PageRank.java
    • PR標準化: Normal.java
    • 啟動程序: PageRankJob.java

    1). 網頁鏈接關系數據: page.csv

    新建文件:page.csv

    1,2 1,3 1,4 2,3 2,4 3,4 4,2

    2). 出始的PR數據:pr.csv

    設置網頁的初始值都是1

    新建文件:pr.csv

    1,1 2,1 3,1 4,1

    3). 鄰接矩陣: AdjacencyMatrix.java

    矩陣解釋:

    • 阻尼系數為0.85
    • 頁面數為4
    • reduce以行輸出矩陣的列,輸出列主要用于分步式存儲,下一步需要轉成行

    新建程序:AdjacencyMatrix.java

    package org.conan.myhadoop.pagerank;import java.io.IOException; import java.util.Arrays; import java.util.Map;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO;public class AdjacencyMatrix {private static int nums = 4;// 頁面數private static float d = 0.85f;// 阻尼系數public static class AdjacencyMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {@Overridepublic void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {System.out.println(values.toString());String[] tokens = PageRankJob.DELIMITER.split(values.toString());Text k = new Text(tokens[0]);Text v = new Text(tokens[1]);context.write(k, v);}}public static class AdjacencyMatrixReducer extends Reducer<Text, Text, Text, Text> {@Overridepublic void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {float[] G = new float[nums];// 概率矩陣列Arrays.fill(G, (float) (1 - d) / G.length);float[] A = new float[nums];// 近鄰矩陣列int sum = 0;// 鏈出數量for (Text val : values) {int idx = Integer.parseInt(val.toString());A[idx - 1] = 1;sum++;}if (sum == 0) {// 分母不能為0sum = 1;}StringBuilder sb = new StringBuilder();for (int i = 0; i < A.length; i++) {sb.append("," + (float) (G[i] + d * A[i] / sum));}Text v = new Text(sb.toString().substring(1));System.out.println(key + ":" + v.toString());context.write(key, v);}}public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {JobConf conf = PageRankJob.config();String input = path.get("input");String input_pr = path.get("input_pr");String output = path.get("tmp1");String page = path.get("page");String pr = path.get("pr");HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);hdfs.rmr(input);hdfs.mkdirs(input);hdfs.mkdirs(input_pr);hdfs.copyFile(page, input);hdfs.copyFile(pr, input_pr);Job job = new Job(conf);job.setJarByClass(AdjacencyMatrix.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(AdjacencyMatrixMapper.class);job.setReducerClass(AdjacencyMatrixReducer.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(page));FileOutputFormat.setOutputPath(job, new Path(output));job.waitForCompletion(true);} }

    4). PageRank計算: PageRank.java

    矩陣解釋:

    • 實現鄰接與PR矩陣的乘法
    • map以鄰接矩陣的行號為key,由于上一步是輸出的是列,所以這里需要轉成行
    • reduce計算得到未標準化的特征值

    新建文件: PageRank.java

    package org.conan.myhadoop.pagerank;import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO;public class PageRank {public static class PageRankMapper extends Mapper<LongWritable, Text, Text, Text> {private String flag;// tmp1 or resultprivate static int nums = 4;// 頁面數@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit();flag = split.getPath().getParent().getName();// 判斷讀的數據集}@Overridepublic void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {System.out.println(values.toString());String[] tokens = PageRankJob.DELIMITER.split(values.toString());if (flag.equals("tmp1")) {String row = values.toString().substring(0,1);String[] vals = PageRankJob.DELIMITER.split(values.toString().substring(2));// 矩陣轉置for (int i = 0; i < vals.length; i++) {Text k = new Text(String.valueOf(i + 1));Text v = new Text(String.valueOf("A:" + (row) + "," + vals[i]));context.write(k, v);}} else if (flag.equals("pr")) {for (int i = 1; i <= nums; i++) {Text k = new Text(String.valueOf(i));Text v = new Text("B:" + tokens[0] + "," + tokens[1]);context.write(k, v);}}}}public static class PageRankReducer extends Reducer<Text, Text, Text, Text> {@Overridepublic void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {Map<Integer, Float> mapA = new HashMap<Integer, Float>();Map<Integer, Float> mapB = new HashMap<Integer, Float>();float pr = 0f;for (Text line : values) {System.out.println(line);String vals = line.toString();if (vals.startsWith("A:")) {String[] tokenA = PageRankJob.DELIMITER.split(vals.substring(2));mapA.put(Integer.parseInt(tokenA[0]), Float.parseFloat(tokenA[1]));}if (vals.startsWith("B:")) {String[] tokenB = PageRankJob.DELIMITER.split(vals.substring(2));mapB.put(Integer.parseInt(tokenB[0]), Float.parseFloat(tokenB[1]));}}Iterator iterA = mapA.keySet().iterator();while(iterA.hasNext()){int idx = iterA.next();float A = mapA.get(idx);float B = mapB.get(idx);pr += A * B;}context.write(key, new Text(PageRankJob.scaleFloat(pr)));// System.out.println(key + ":" + PageRankJob.scaleFloat(pr));}}public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {JobConf conf = PageRankJob.config();String input = path.get("tmp1");String output = path.get("tmp2");String pr = path.get("input_pr");HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);hdfs.rmr(output);Job job = new Job(conf);job.setJarByClass(PageRank.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(PageRankMapper.class);job.setReducerClass(PageRankReducer.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(input), new Path(pr));FileOutputFormat.setOutputPath(job, new Path(output));job.waitForCompletion(true);hdfs.rmr(pr);hdfs.rename(output, pr);} }

    5). PR標準化: Normal.java

    矩陣解釋:

    • 對PR的計算結果標準化,讓所以PR值落在(0,1)區間

    新建文件:Normal.java

    package org.conan.myhadoop.pagerank;import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map;import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.conan.myhadoop.hdfs.HdfsDAO;public class Normal {public static class NormalMapper extends Mapper<LongWritable, Text, Text, Text> {Text k = new Text("1");@Overridepublic void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {System.out.println(values.toString());context.write(k, values);}}public static class NormalReducer extends Reducer<Text, Text, Text, Text> {@Overridepublic void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {List vList = new ArrayList();float sum = 0f;for (Text line : values) {vList.add(line.toString());String[] vals = PageRankJob.DELIMITER.split(line.toString());float f = Float.parseFloat(vals[1]);sum += f;}for (String line : vList) {String[] vals = PageRankJob.DELIMITER.split(line.toString());Text k = new Text(vals[0]);float f = Float.parseFloat(vals[1]);Text v = new Text(PageRankJob.scaleFloat((float) (f / sum)));context.write(k, v);System.out.println(k + ":" + v);}}}public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {JobConf conf = PageRankJob.config();String input = path.get("input_pr");String output = path.get("result");HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);hdfs.rmr(output);Job job = new Job(conf);job.setJarByClass(Normal.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(NormalMapper.class);job.setReducerClass(NormalReducer.class);job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));job.waitForCompletion(true);} }

    6). 啟動程序: PageRankJob.java

    新建文件:PageRankJob.java

    package org.conan.myhadoop.pagerank;import java.text.DecimalFormat; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern;import org.apache.hadoop.mapred.JobConf;public class PageRankJob {public static final String HDFS = "hdfs://192.168.1.210:9000";public static final Pattern DELIMITER = Pattern.compile("[\t,]");public static void main(String[] args) {Map<String, String> path = new HashMap<String, String>();path.put("page", "logfile/pagerank/page.csv");// 本地的數據文件path.put("pr", "logfile/pagerank/pr.csv");// 本地的數據文件path.put("input", HDFS + "/user/hdfs/pagerank");// HDFS的目錄path.put("input_pr", HDFS + "/user/hdfs/pagerank/pr");// pr存儲目path.put("tmp1", HDFS + "/user/hdfs/pagerank/tmp1");// 臨時目錄,存放鄰接矩陣path.put("tmp2", HDFS + "/user/hdfs/pagerank/tmp2");// 臨時目錄,計算到得PR,覆蓋input_prpath.put("result", HDFS + "/user/hdfs/pagerank/result");// 計算結果的PRtry {AdjacencyMatrix.run(path);int iter = 3;for (int i = 0; i < iter; i++) {// 迭代執行PageRank.run(path);}Normal.run(path);} catch (Exception e) {e.printStackTrace();}System.exit(0);}public static JobConf config() {// Hadoop集群的遠程配置信息JobConf conf = new JobConf(PageRankJob.class);conf.setJobName("PageRank");conf.addResource("classpath:/hadoop/core-site.xml");conf.addResource("classpath:/hadoop/hdfs-site.xml");conf.addResource("classpath:/hadoop/mapred-site.xml");return conf;}public static String scaleFloat(float f) {// 保留6位小數DecimalFormat df = new DecimalFormat("##0.000000");return df.format(f);} }

    程序代碼已上傳到github:

    https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/pagerank

    這樣就實現了,PageRank的并行吧!接下來,我們就可以用PageRank做一些有意思的應用了。

    轉載請注明出處:
    http://blog.fens.me/algorithm-pagerank-mapreduce/

    總結

    以上是生活随笔為你收集整理的PageRank算法并行实现的全部內容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。