我有2个大dataframes,edge
和vertex
,我知道他们需要在特殊类型Vertex
和Edge
RDDS,但每次我发现教程指定Edge
和Vertex
RDDS为3至10个项目的数组。我需要他们直接从大幅RDD转换。我将如何改变一个数据帧/正常RDD为正确的类型?
我在这里遵循的例子:https://spark.apache.org/docs/latest/graphx-programming-guide.html#example-property-graph但它枚举所有的关系,也有很多在我的使用情况。
edge
DF有3列,(的sourceID,destID,关系)vertex
DF有2列(ID,姓名)我迄今为止尝试:
val vertex: RDD[(VertexId, String)] = sc.parallelize((vertexDF("ID"), vertexDF("Name")))
返回错误:
error: type mismatch;
found : (org.apache.spark.sql.Column, org.apache.spark.sql.Column)
required: Seq[(org.apache.spark.graphx.VertexId, String)]
(which expands to) Seq[(Long, String)]
我会怎样改变一个数据帧/正常RDD到专门的顶点/边RDD类型?
有一个graphframes火花库来处理基于数据帧的图形。它有一个边缘和顶点数据帧对转换成GraphX RDD的方法。请参阅:http://graphframes.github.io/graphframes/docs/_site/user-guide.html#example-conversions。
对于你的榜样它会是这样的:
val edgeDf = .... // (sourceID, destID, relationship)
val verexDf = .... // (ID, Name)
import org.graphframes._
val g = GraphFrame(
verexDf.select($"id", $"name"),
edgeDf.select ($"sourceID" as "src", $"destID" as "dst", $"relationship"))
// Convert to GraphX
val gx: Graph[Row, Row] = g.toGraphX