我有一个以下格式的数据框:
性格 | 标题 |
---|---|
托尼·史塔克 | [“钢铁侠”] |
詹姆斯·布坎南·巴恩斯 | [《美国队长:复仇者先锋》、《美国队长:冬日战士》、《美国队长:内战》、《复仇者联盟:无限战争》] |
马库斯·布莱索 | [《美国队长:冬兵》] |
我的目标是创建一个 GraphX 表示,其中顶点是
Character
和 Title
,边代表角色出现在电影中的时间。这是一个示例数据集,实际数据会大得多,因此该解决方案必须可跨多个执行器扩展。
我是 Scala 和 Spark 的新手。我的策略是创建一个
characterVerticesRDD
、movieVerticesRDD
,然后将它们组合在一起。
我相信这是构建
characterVerticesRDD
的正确方法:
val characterVerticesRDD: RDD[(VertexId, String)] = df.rdd.map(row => (MurmurHash3.stringHash(row.getString(0)), row.getString(0)))
以下是我的第一次天真的尝试。我现在意识到使用
Set
是无效的,因为它不能在执行器之间共享,并且使用 collect
在可扩展的解决方案中也不起作用。
val movieVertices = scala.collection.mutable.Set[(Long, String)]()
df.rdd.collect.foreach(row => {
row.getAs[EmbeddedList]("title").elements.map { case d: String => d }.toList.foreach(movie => movieVertices += ((MurmurHash3.stringHash(movie), movie)))
})
val movieVerticesRDD: RDD[(VertexId, String)] = sc.parallelize(movieVertices.toList)
// combine vertices
val verticesRDD: RDD[(VertexId, String)] = characterVerticesRDD ++ movieVerticesRDD
考虑到我的 DataFrame 结构,构建此
movieVerticesRDD
的最佳方法是什么?我不知何故需要迭代电影标题来创建顶点。我认为创建边缘时的策略是类似的,因为我需要迭代数据帧的每一行来创建角色和电影之间的边缘。
感谢您的指导。
这应该可以解决问题。基本上,我们首先创建一个具有 id 的不同顶点的 RDD(我们使用
zipWithIndex
来生成它们)。然后,我们创建一个边(顶点元组)的数据框,并加入之前创建的 id。最后,我们将数据帧转换为 RDD,并使用我们创建的两个 RDD 创建图形。
// your data
val df = Seq(
"Tony Stark" -> Seq("Iron Man"),
"James Buchanan Barnes" -> Seq("Captain America: The First Avenger","Captain America: The Winter Soldier","Captain America: Civil War","Avengers: Infinity War"),
"Marcus Bledsoe" -> Seq("Captain America: The Winter Soldier")
).toDF("character", "title")
// Movies and characters are vertices, creating a RDD of vertices and adding indices
val vertices = df
.select(explode(concat(array('character), 'title)) as "x")
.distinct.rdd.map(_.getAs[String](0))
.zipWithIndex.map(_.swap)
// Dataframe of vertices (same as above)
val vertexDf = vertices.toDF("id", "node")
// Dataframe of edges.
val edgeDF = df
.select('character, explode('title) as "title")
// RDD of edges. We need to join the vertex ids that we previously created.
val edges = edgeDF
.join(vertexDf, edgeDF("character") === vertexDf("node"))
.select('title, 'id as "character_id")
.join(vertexDf, edgeDF("title") === vertexDf("node"))
.rdd
.map(row => Edge(row.getAs[Long]("character_id"), row.getAs[Long]("id"), None))
// And creating the graph
val graph = Graph(vertices, edges)