日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

sparkGraphX 图操作:pregel(加强的aggregateMessages)

發(fā)布時間:2024/3/13 编程问答 74 豆豆
生活随笔 收集整理的這篇文章主要介紹了 sparkGraphX 图操作:pregel(加强的aggregateMessages) 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

目錄

1、Pregel API:

2、代碼實現(xiàn):

使用pregal實現(xiàn)找出源頂點到每個節(jié)點最小花費

使用pregel實現(xiàn)找出源節(jié)點到每個節(jié)點的最大深度


1、Pregel API:

圖本身就是內在的遞歸的數(shù)據(jù)結構,因為一個頂點的屬性可能依賴于其neighbor,而neighbor的屬性又依賴于他們的neighbour。所以很多重要的圖算法都會迭代計算每個頂點的屬性,直到達到一個穩(wěn)定狀態(tài)。

GraphX中的Pregel操作符是一個批量同步并行(bulk-synchronous parallel message abstraction)的messaging abstraction,用于圖的拓撲結構(topology of the graph)。The Pregel operator executes in a series of super steps in whichvertices receive the sum of their inbound messagesfrom the previous super step,compute a new valuefor the vertex property, and thensend messages to neighboring verticesin the next super step. Message是作為edge triplet的一個函數(shù)并行計算的,message的計算可以使用source和dest頂點的屬性。沒有收到message的頂點在super step中被跳過。迭代會在么有剩余的信息之后停止,并返回最終的圖。

pregel的定義:

def pregel[A]

????(initialMsg: A,//在第一次迭代中每個頂點獲取的起始

????msgmaxIter: Int = Int.MaxValue,//迭代計算的次數(shù)

????activeDir: EdgeDirection = EdgeDirection.Out

)(

????vprog: (VertexId, VD, A) => VD,//頂點的計算函數(shù),在每個頂點運行,根據(jù)頂點的ID,屬性和獲取的inbound message來計算頂點的新屬性值。頂一次迭代的時候,inbound message為initialMsg,且每個頂點都會執(zhí)行一遍該函數(shù)。以后只有上次迭代中接收到信息的頂點會執(zhí)行。

????sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//應用于頂點的出邊(out edges)用于接收頂點發(fā)出的信息

????mergeMsg: (A, A) => A//合并信息的算法

)

算法實現(xiàn)的大致過程:

var g = mapVertices((vid, vdata) => vprog(vid, vdata, initMsg)).cache //第一步是根據(jù)initMsg在每個頂點執(zhí)行一次vprog算法,從而每個頂點的屬性都會迭代一次。

var messages = g.mapReduceTriplets(sendMsg, mergeMsg)

var messagesCount = messages.count

var i = 0

while(activeMessages > 0 && i < maxIterations){

????g = g.joinVertices(messages)(vprog).cache

????val oldMessages = messages

????messages = g.mapReduceTriplets(

????????sendMsg,

? ? ? ? mergeMsg,

????????Some((oldMessages, activeDirection))

????).cache()

????activeMessages = messages.count

????i += 1

}

g

pregel算法的一個實例:將圖跟一些一些初始的score做關聯(lián),然后將頂點分數(shù)根據(jù)出度大小向外發(fā)散,并自己保留一份:

//將圖中頂點添加上該頂點的出度屬性

val graphWithDegree = graph.outerJoinVertices(graph.outDegrees){

????case (vid, name, deg) => (name, deg match {

????????case Some(deg) => deg+0.0

????????case None => 1.25}

????)

}//將圖與初始分數(shù)做關聯(lián)

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))//將圖與初始分數(shù)做關聯(lián)

val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){

????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))

}

graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))

算法的第一步:將0.0(也就是傳入的初始值initMsg)跟各個頂點的值相加(還是原來的值),然后除以頂點的出度。這一步很重要,不能忽略。 并且在設計的時候也要考慮結果會不會被這一步所影響。

解釋來源:https://www.jianshu.com/p/d9170a0723e4

?

2、代碼實現(xiàn):

使用pregal實現(xiàn)找出源頂點到每個節(jié)點最小花費

package homeWorkimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.graphx.util.GraphGeneratorsobject MapGraphX5 {def main(args: Array[String]): Unit = {//設置運行環(huán)境val conf = new SparkConf().setAppName("Pregel API GraphX").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")// 構建圖val myVertices = sc.parallelize(Array((1L, 0), (2L, 0), (3L, 0), (4L, 0),(5L, 0)))val myEdges = sc.makeRDD(Array(Edge(1L, 2L, 2.5),Edge(2L, 3L, 3.6), Edge(3L, 4L, 4.5),Edge(4L, 5L, 0.1), Edge(3L, 5L, 5.2)))val myGraph = Graph(myVertices, myEdges)//設置源頂點val sourceId: VertexId = 1L//初始化數(shù)據(jù)集,是源頂點就為0.0,不是就設置為double的正無窮大val initialGraph = myGraph.mapVertices((id, _) =>if (id == sourceId) 0.0 else Double.PositiveInfinity)/*def pregel[A](initialMsg : A,maxIterations : scala.Int = { /* compiled code */ },activeDirection : org.apache.spark.graphx.EdgeDirection = { /* compiled code */ })(vprog : scala.Function3[org.apache.spark.graphx.VertexId, VD, A, VD],sendMsg : scala.Function1[org.apache.spark.graphx.EdgeTriplet[VD, ED],scala.Iterator[scala.Tuple2[org.apache.spark.graphx.VertexId, A]]],mergeMsg : scala.Function2[A, A, A])(implicit evidence$6 : scala.reflect.ClassTag[A]): org.apache.spark.graphx.Graph[VD, ED] = { /* compiled code */ } */val sssp: Graph[Double, Double] = initialGraph.pregel(//initialMsDouble.PositiveInfinity//maxIterations和activeDirection使用默認值)(//vprog 更改數(shù)據(jù)集(id, dist, newDist) => math.min(dist, newDist),//sendMsgtriplet => { // Send Message//尋找1L頂點到每個頂點的最小花費if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {//滿足sum(起始頂點+邊值) 小于 終止頂點當前數(shù)據(jù)集中的值,就把sum發(fā)送給終止頂點,更新數(shù)據(jù)集的數(shù)據(jù)Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))} else {Iterator.empty}},//mergeMsg 選擇當前數(shù)據(jù)和發(fā)送數(shù)據(jù)的最小值傳送(a, b) => math.min(a, b))sssp.vertices.collect.foreach(println(_))} }

使用pregel實現(xiàn)找出源節(jié)點到每個節(jié)點的最大深度

package pregelimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx.{Edge, EdgeDirection, Graph}object Demo2 {def main(args: Array[String]): Unit = {//設置運行環(huán)境val conf = new SparkConf().setAppName("Pregol Api GraphX").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")// 構建圖val myVertices = sc.parallelize(Array((1L, "張三"), (2L, "李四"), (3L, "王五"), (4L, "錢六"),(5L, "領導")))val myEdges = sc.makeRDD(Array( Edge(1L,2L,"朋友"),Edge(2L,3L,"朋友") , Edge(3L,4L,"朋友"),Edge(4L,5L,"上下級"),Edge(3L,5L,"上下級")))val myGraph = Graph(myVertices,myEdges)val g = myGraph.mapVertices((vid,vd)=>0)var newGraph: Graph[Int, String] = g.pregel(0)((id, attr, maxValue) => maxValue,triplet => { // Send Messageif (triplet.srcAttr + 1 > triplet.dstAttr) {Iterator((triplet.dstId, triplet.srcAttr + 1))} else {Iterator.empty}},(a: Int, b: Int) => math.max(a, b))newGraph.vertices.collect.foreach(println(_))}}


?

總結

以上是生活随笔為你收集整理的sparkGraphX 图操作:pregel(加强的aggregateMessages)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內容還不錯,歡迎將生活随笔推薦給好友。