我正在Spark中使用GraphX来处理图形。我有一个val common_neighbors: RDD[VertexId]
,其中包含一些vertexId。我使用地图功能将其转换为诸如(node,1)的结构,该节点是顶点的ID,而1是其初始属性。转换代码如下:
val p =common_neighbors.map(x=>(x,1))
我有一个图,其结构如下:(node,node_property(label,isDefined))。例如(1,(14,true))。这意味着ID = 1的节点具有label = 14和isDefined。
我想以并行和分布式的方式转换p中的节点属性,如果图中的节点标签大于5,则代码如下:
val x=p.map(node=>{
val temp_property=graph.vertices.filter(x=>x._1==node._1).values.take(1)
if(temp_property(0).label > 5) {
(node._1,((temp_property(0).label)+5))
}
})
但是当我执行代码时,出现错误。这是什么问题?我该如何解决?
据我所知,您正在另一个RDD中使用一个RDD,那不是真的。除此之外,您可以这样做:您可以使用以下代码将common_neighbors与真实图形连接起来:
val new_val=common_neighbors.join(work_graph.vertices)
然后通过一些映射转换,可以在new_val中建立图形的结构,然后可以使用map或mapValues对new_val的值进行所有操作