Spark GraphX 图形数据分析(API PageRank Pregel)
Spark GraphX 图形数据分析GraphX API图的构建与图信息的查看图的算子pageRank应用pregel应用求最短距离GraphX API图的基本概念和术语这里介绍,这里以示例来做基本演示与理解外部依赖除了基本的spark依赖之外,还需导入spark-GraphX构建如下关系图,圈内数字分别表示各个点的id,以(name,job)作为各点的属性,各点之间形成关系图,边的权重表示为点
·
Spark GraphX 图形数据分析
GraphX API
图的基本概念和术语这里介绍,这里以示例来做基本演示与理解
外部依赖除了基本的spark依赖之外,还需导入spark-GraphX
- 构建如下关系图,圈内数字分别表示各个点的id,以(name,job)作为各点的属性,各点之间形成关系图,边的权重表示为点与点之间的联系
图的构建与图信息的查看
- 创建图操作(以下为操作步骤,代码在main方法中执行)
//创建sparkContext
val conf = new SparkConf().setAppName("ghx").setMaster("local[*]")
val sc = new SparkContext(conf)
//创建所有的点
val vertices= sc.makeRDD(Seq((3L,("zl","stu")),(5L,("zs","prof")),(2L,("ls","prof")),(7L,("ww","pstdoc"))))
val edges = sc.makeRDD(Seq(Edge(2L,5L,"tongshi"),Edge(5L,3L,"jiao"),Edge(5L,7L,"pi"),Edge(3L,7L,"wen")))
//创建图
val graph = Graph(vertices,edges)
- 查看图信息
- 产看边点数量
形成的图关系,构建了4个点以及四条有向的边
println(graph.numEdges,graph.numVertices)
//打印结果
(4,4)
- 查看点属性和边属性
//点属性
graph.vertices.foreach(x=>println(x._1,x._2))
//打印结果
(2,(ls,prof))
(7,(ww,pstdoc))
(5,(zs,prof))
(3,(zl,stu))
//边属性
graph.edges.foreach(x=>println(x.srcId,x.dstId,x.attr))
//打印结果
(5,7,pi)
(2,5,tongshi)
(3,7,wen)
(5,3,jiao)
//通过triplets查看点边的所有值
//srcId(起始点的id) srcAttr(起始点的属性值) dstId(到达点的id) dstAttr(到达点的属性值) attr(权重关系)
graph.triplets.foreach(x=>println(x.srcId,x.srcAttr,x.dstId,x.dstAttr,x.attr))
//打印结果
(5,(zs,prof),3,(zl,stu),jiao)
(2,(ls,prof),5,(zs,prof),tongshi)
(5,(zs,prof),7,(ww,pstdoc),pi)
(3,(zl,stu),7,(ww,pstdoc),wen)
- 查看出度和入度
- 出度:指从当前顶点指向其他顶点的边的数量
- 入度:其他顶点指向当前顶点的边的数量
//出度
graph.outDegrees.foreach(x=>println(x))
//打印结果
(5,2)
(2,1)
(3,1)
//入度
graph.inDegrees.foreach(x=>println(x))
//打印结果
(3,1)
(5,1)
(7,2)
//出度入度之和
graph.degrees.foreach(x=>println(x))
//打印结果
(2,1)
(5,3)
(3,2)
(7,2)
图的算子
- 属性算子map
//改变点的值(一边之改变点的属性值)
//如下两种写法相同:只取name的值进行修改
graph.mapVertices{case(vertexId,(name,job))=>(vertexId,name)}
graph.mapVertices{case(vertexId,attr)=>(vertexId,attr._1+"hello")}
//改变边的值 graph.mapEdges(e=>Edge(e.srcId,e.dstId,e.attr+",Hello"))
- 结构算子
//改变边的方向
graph.reverse
//生成满足顶点与边条件的子图
graph.subgraph(vpred=(id,attr)=>attr._2.contains("pro"))
- join算子
//join 合并操作 如下创建新的点与原构建图进行join操作
//将相同id的点的属性值拼接
val newPoin = sc.parallelize(Array((3L,"hehe"),(5L,"xixi"),(4L,"cici")))
graph.joinVertices(newPoin)((id,src,newval)=>(src._1+"@"+newval,src._2)).vertices.foreach(f=>println(f._2))
//打印结果
(ww,pstdoc)
(ls,prof)
(zs@xixi,prof)
(zl@hehe,stu)
//此外还有outerJoinVertices相当于outerjoin
outerJoinVertices
pageRank应用
通过pageRank找出用户社交网络最重要的用户
object myGrp extends App {
val spark = SparkSession.builder().master("local[1]").appName("users").getOrCreate()
val tweeters = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)))
val vects = spark.sparkContext.parallelize(tweeters)
val followRelations = Array(Edge[Int](2L, 1L, 7), Edge[Int](2L, 4L, 2), Edge[Int](3L, 2L, 4), Edge[Int](3L, 6L, 3), Edge[Int](4L, 1L, 1), Edge[Int](5L, 2L, 2), Edge[Int](5L, 3L, 8), Edge[Int](5L, 6L, 3))
val edges = spark.sparkContext.parallelize(followRelations)
val grah = Graph(vects,edges)
val cc = grah.pageRank(0.001)
cc.vertices.sortBy(_._2,false).foreach(println)
}
pregel应用求最短距离
求顶点5到各个点的最短距离
object mypregel extends App {
//创建SparkContext
val conf = new SparkConf().setMaster("local[*]").setAppName("pregel")
val sc = SparkContext.getOrCreate(conf)
//创建点
val vertexs = sc.makeRDD(Seq((1L,"park"),(2L,"company"),(3L,"restaurant"),
(4L,"hospital"),(5L,"home"),(6L,"school")))
//创建边
val edges=sc.makeRDD(Seq(Edge(2L,1L,7),Edge(2L,4L,2),Edge(3L,2L,4),Edge(3L,6L,3),
Edge(4L,1L,1), Edge(2L,5L,2),Edge(5L,3L,8),Edge(5L,6L,3)))
//创建图
val graph = Graph(vertexs,edges)
//计算顶点5到其他各个点的最短距离
//起始点id
val srcVertexId = 5L
//起始点赋值为0,其余的点全部赋值为无穷大
val initialGraph = graph.mapVertices{
case (id,destination) => if(id==srcVertexId) 0.0 else Double.PositiveInfinity
}
//调用pregel
val pregelGraph = initialGraph.pregel(
Double.PositiveInfinity,
Int.MaxValue,
EdgeDirection.Out
)(
(id:VertexId,vd:Double,distMsg:Double)=>{
val minDist = math.min(vd,distMsg)
println(s"顶点${id},属性${vd},接收到消息${distMsg},合并后的属性${minDist}")
minDist
},
(edgeTriplet:EdgeTriplet[Double,PartitionID])=>{
if (edgeTriplet.srcAttr + edgeTriplet.attr< edgeTriplet.dstAttr){
println(s"顶点${edgeTriplet.srcId}给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
}else{
Iterator.empty
}
},
(msg1:Double,msg2:Double)=> math.min(msg1,msg2)
)
//输出结果
pregelGraph.triplets.collect().foreach(println)
println(pregelGraph.vertices.collect().mkString(" "))
sc.stop()
}
更多推荐
已为社区贡献2条内容
所有评论(0)