我想计算三角形的计数问题,其中我的输入是follower,followee。我使用广播连接来计算这个三角形,将边设置为
edges.collectAsMap()
以广播到所有节点。
val spark = SparkSession.builder()
.appName("Combining in Spark")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
val edgesRDD = spark.sparkContext.textFile(args(0))
.map(line => {
val parts = line.split(",")
(parts(0).toInt, parts(1).toInt) // (follower, user)
})
.filter { case (follower, user) => follower < maxValue && user < maxValue }
val broadcastEdges = spark.sparkContext.broadcast(edgesRDD.collectAsMap())
val trianglesRDD = edgesRDD
.flatMap { case (a, b) =>
broadcastEdges.value.get(b) match {
case Some(c) if broadcastEdges.value.contains(c) && broadcastEdges.value(c) == a => Seq((a, b, c))
case _ => Seq.empty
}
}
val triangleCount = trianglesRDD.count() / 3
这段代码可能有什么问题?
我的输入数据是
1,2 2,3 2,4, 3,1
。我的输出应该是 (1,2,3) (2,1,3)
和 (3,1,2)
,因为这 3 个节点可以被圈出。但我只得到 (2,3,1)
,而如果我的输入数据是 1,2 2,3 2,4 4,1
,我可以获得正确的输出,即 (1,2,4) (2,1,4) (2,4,1)
。
收集为地图以供输入
1 -> 2 : 2 -> 3 : 2 -> 4 : 3 ->1
edgesRDD.collectAsMap() => Map(2 -> 4, 1 -> 2, 3 -> 1)
你希望它看起来像
Map(2 -> 3, 1 -> 2, 3 -> 1)
地图键
2
在您的输入中出现两次,因此它将 2 -> 3
覆盖为 2 ->4
,从而中断连接。如果您删除输入对 (2,4)
或按照 (2,3)
之前的顺序将其向上移动,那么它应该可以工作。
在第二种情况下,对于输入
1,2 2,3 2,4 4,1
,(2,4)
出现在(2,3)
之后,因此覆盖仍然保持连接。最终地图看起来像
Map(2 -> 4, 4 -> 1, 1 -> 2)
因此效果很好。