700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 大数据基础之Spark——Spark pregel详细过程 一看就懂

大数据基础之Spark——Spark pregel详细过程 一看就懂

时间:2021-07-11 04:33:46

相关推荐

大数据基础之Spark——Spark pregel详细过程 一看就懂

Pregel概述

Pregel是Google提出的用于大规模分布式图计算框架

  - 图遍历(BFS)

  - 单源最短路径(SSSP)

  - PageRank计算Pregel的计算由一系列迭代组成,称为superstepsPregel迭代过程

  - 每个顶点从上一个superstep接收入站消息

  - 计算顶点新的属性值

  - 在下一个superstep中想相邻的顶点发送消息

  - 当没有剩余消息是,迭代结束

Pregel原理分析

pregel函数源码以及各个参数的简介:

def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}

案例:求5顶点到1顶点的最短距离

顶点的两个状态:

  - 钝化态:类似于休眠,不做任何处理

  - 激活态:可以进行数据的接受和发送顶点能够处于激活状态需要的条件

  - 成功收到消息

  - 成功发送任何一条消息

首先,给5节点的初始值为0,其他值的初始值为正无穷大

然后从5节点出发,每次用发送方的值+权重和接受方的值相比较,取小值作为接受方的值,然后再以此方式发送给下一节点,如果发送方的值+权重大于接受方的值,则无法发送给下一节点。根据这种方法得到的结果如下图:

最终得到的5顶点到1顶点的最短距离为15。

代码实现:

val conf = new SparkConf().setMaster("local[2]").setAppName("mytst")val sc = SparkContext.getOrCreate(conf)val vect = sc.parallelize(Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50))))val edges = sc.parallelize(Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3)))val graphx = Graph(vect,edges)// 设置起始顶点val srcVectId = 5Lval initialGraph = graphx.mapVertices({case (vid,(name,age))=>if (vid==5L) 0.0 else Double.PositiveInfinity})// 调用pregelval pregelGraph = initialGraph.pregel(Double.PositiveInfinity, //每个点的初始值,无穷大Int.MaxValue,//最大迭代次数EdgeDirection.Out//发送信息的方向)( //vprog(接受到的消息和自己的消息进行合并)(vid:VertexId,vd:Double,distMsg:Double)=>{val minDist = math.min(vd,distMsg)println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")minDist},(edgeTriplet:EdgeTriplet[Double,PartitionID])=>{//发送消息,如果自己的消息+权重<目的地的消息,则发送if(edgeTriplet.srcAttr+edgeTriplet.attr<edgeTriplet.dstAttr){println(s"顶点${edgeTriplet.srcId} 给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId,Double)]((edgeTriplet.dstId,edgeTriplet.srcAttr+edgeTriplet.attr))}else{Iterator.empty}},(msg1:Double,msg2:Double)=>math.min(msg1,msg2) //多条接收消息,mergeMessage,取小合并多条消息)// 输出结果pregelGraph.triplets.foreach(println)println(pregelGraph.vertices.collect.mkString(","))// 关闭资源sc.stop()

迭代结果展示:

//------------------------------------------ 各个顶点接受初始消息initialMsg ------------------------------------------

顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity

顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity

顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity

顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity

顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity

顶点5,属性0.0,收到消息Infinity,合并后的属性0.0

//------------------------------------------ 第一次迭代 ------------------------------------------

顶点5 给 顶点6 发送消息 3.0

顶点5 给 顶点3 发送消息 8.0

顶点3,属性Infinity,收到消息8.0,合并后的属性8.0

顶点6,属性Infinity,收到消息3.0,合并后的属性3.0

//------------------------------------------ 第二次迭代 ------------------------------------------

顶点3 给 顶点2 发送消息 12.0

顶点2,属性Infinity,收到消息12.0,合并后的属性12.0

//------------------------------------------ 第三次迭代 ------------------------------------------

顶点2 给 顶点4 发送消息 14.0

顶点2 给 顶点1 发送消息 19.0

顶点1,属性Infinity,收到消息19.0,合并后的属性19.0

顶点4,属性Infinity,收到消息14.0,合并后的属性14.0

//------------------------------------------ 第四次迭代 ------------------------------------------

顶点4 给 顶点1 发送消息 15.0

顶点1,属性19.0,收到消息15.0,合并后的属性15.0

//------------------------------------------ 第五次迭代不用发送消息 ------------------------------------------

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。