将包含嵌入列表的 Spark DataFrame 转换为 Scala 中的 RDD

问题描述 投票:0回答:1

我有一个以下格式的数据框:

性格 标题
托尼·史塔克 [“钢铁侠”]
詹姆斯·布坎南·巴恩斯 [《美国队长:复仇者先锋》、《美国队长:冬日战士》、《美国队长:内战》、《复仇者联盟:无限战争》]
马库斯·布莱索 [《美国队长:冬兵》]

我的目标是创建一个 GraphX 表示,其中顶点是

Character
Title
,边代表角色出现在电影中的时间。这是一个示例数据集,实际数据会大得多,因此该解决方案必须可跨多个执行器扩展。

我是 Scala 和 Spark 的新手。我的策略是创建一个

characterVerticesRDD
movieVerticesRDD
,然后将它们组合在一起。

我相信这是构建

characterVerticesRDD
的正确方法:

val characterVerticesRDD: RDD[(VertexId, String)] = df.rdd.map(row => (MurmurHash3.stringHash(row.getString(0)), row.getString(0)))

以下是我的第一次天真的尝试。我现在意识到使用

Set
是无效的,因为它不能在执行器之间共享,并且使用
collect
在可扩展的解决方案中也不起作用。

val movieVertices = scala.collection.mutable.Set[(Long, String)]()
df.rdd.collect.foreach(row => {
    row.getAs[EmbeddedList]("title").elements.map { case d: String => d }.toList.foreach(movie => movieVertices += ((MurmurHash3.stringHash(movie), movie)))
})
val movieVerticesRDD: RDD[(VertexId, String)] = sc.parallelize(movieVertices.toList)

// combine vertices
val verticesRDD: RDD[(VertexId, String)] = characterVerticesRDD ++ movieVerticesRDD

考虑到我的 DataFrame 结构,构建此

movieVerticesRDD
的最佳方法是什么?我不知何故需要迭代电影标题来创建顶点。我认为创建边缘时的策略是类似的,因为我需要迭代数据帧的每一行来创建角色和电影之间的边缘。

感谢您的指导。

scala apache-spark spark-graphx
1个回答
0
投票

这应该可以解决问题。基本上,我们首先创建一个具有 id 的不同顶点的 RDD(我们使用

zipWithIndex
来生成它们)。然后,我们创建一个边(顶点元组)的数据框,并加入之前创建的 id。最后,我们将数据帧转换为 RDD,并使用我们创建的两个 RDD 创建图形。

// your data
val df = Seq(
    "Tony Stark" -> Seq("Iron Man"),
    "James Buchanan Barnes" -> Seq("Captain America: The First Avenger","Captain America: The Winter Soldier","Captain America: Civil War","Avengers: Infinity War"),
    "Marcus Bledsoe" -> Seq("Captain America: The Winter Soldier")
).toDF("character", "title")

// Movies and characters are vertices, creating a RDD of vertices and adding indices
val vertices = df
    .select(explode(concat(array('character), 'title)) as "x")
    .distinct.rdd.map(_.getAs[String](0))
    .zipWithIndex.map(_.swap)
// Dataframe of vertices (same as above)
val vertexDf = vertices.toDF("id", "node")
// Dataframe of edges.
val edgeDF = df
    .select('character, explode('title) as "title")
// RDD of edges. We need to join the vertex ids that we previously created.
val edges = edgeDF
    .join(vertexDf, edgeDF("character") === vertexDf("node"))
    .select('title, 'id as "character_id")
    .join(vertexDf, edgeDF("title") === vertexDf("node"))
    .rdd
    .map(row => Edge(row.getAs[Long]("character_id"), row.getAs[Long]("id"), None))
// And creating the graph
val graph = Graph(vertices, edges)
© www.soinside.com 2019 - 2024. All rights reserved.