与 Spark 广播连接 RDD 相关的问题,并在进行三角形计算时广播 RDD.collectAsMap()

问题描述 投票:0回答:1

我想计算三角形的计数问题,其中我的输入是follower,followee。我使用广播连接来计算这个三角形,将边设置为

edges.collectAsMap()
以广播到所有节点。

val spark = SparkSession.builder()
  .appName("Combining in Spark")
  .config("spark.master", "local")
  .getOrCreate()

import spark.implicits._
val edgesRDD = spark.sparkContext.textFile(args(0))
  .map(line => {
    val parts = line.split(",")
    (parts(0).toInt, parts(1).toInt)  // (follower, user)
  })
  .filter { case (follower, user) => follower < maxValue && user < maxValue }

val broadcastEdges = spark.sparkContext.broadcast(edgesRDD.collectAsMap())

val trianglesRDD = edgesRDD
  .flatMap { case (a, b) =>
    broadcastEdges.value.get(b) match {
      case Some(c) if broadcastEdges.value.contains(c) && broadcastEdges.value(c)             == a => Seq((a, b, c))
      case _ => Seq.empty
    }
  }

val triangleCount = trianglesRDD.count() / 3

这段代码可能有什么问题?

我的输入数据是

1,2 2,3 2,4, 3,1
。我的输出应该是
(1,2,3) (2,1,3)
(3,1,2)
,因为这 3 个节点可以被圈出。但我只得到
(2,3,1)
,而如果我的输入数据是
1,2 2,3 2,4 4,1
,我可以获得正确的输出,即
(1,2,4) (2,1,4) (2,4,1)

scala apache-spark
1个回答
0
投票

收集为地图以供输入

1 -> 2 : 2 -> 3 : 2 -> 4 : 3 ->1

edgesRDD.collectAsMap()  =>  Map(2 -> 4, 1 -> 2, 3 -> 1)

你希望它看起来像

Map(2 -> 3, 1 -> 2, 3 -> 1)

地图键

2
在您的输入中出现两次,因此它将
2 -> 3
覆盖为
2 ->4
,从而中断连接。如果您删除输入对
(2,4)
或按照
(2,3)
之前的顺序将其向上移动,那么它应该可以工作。

在第二种情况下,对于输入

1,2 2,3 2,4 4,1
(2,4)
出现在
(2,3)
之后,因此覆盖仍然保持连接。最终地图看起来像

Map(2 -> 4, 4 -> 1, 1 -> 2)

因此效果很好。

© www.soinside.com 2019 - 2024. All rights reserved.