如何从数据框构建图表? (GraphX)

问题描述 投票:-1回答:1

我是scala和spark的新手,我需要从数据框构建一个图形。这是我的数据帧的结构,其中S和O是节点,列P表示边。

+---------------------------+---------------------+----------------------------+
|S                          |P                    |O                           |
+---------------------------+---------------------+----------------------------+
|http://website/Jimmy_Carter|http://web/name      |James Earl Carter           |
|http://website/Jimmy_Car   |http://web/country   |http://website/United_States|
|http://website/Jimmy_Car   |http://web/birthPlace|http://web/Georgia_(US)     |
+---------------------------+---------------------+----------------------------+

这是数据帧的代码,我想从数据框“dfA”创建一个图表

 val test = sc
     .textFile("testfile.ttl")
     .map(_.split(" "))
     .map(p => Triple(Try(p(0).toString()).toOption,
                      Try(p(1).toString()).toOption,
                      Try(p(2).toString()).toOption))
     .toDF()

  val url_regex = """^(?:"|<{1}\s?)(.*)(?:>(?:\s\.)?|,\s.*)$"""
  val dfA = test
      .withColumn("Subject", regexp_extract($"Subject", url_regex, 1))
      .withColumn("Predicate", regexp_extract($"Predicate", url_regex, 1))
      .withColumn("Object", regexp_extract($"Object", url_regex, 1))
scala apache-spark dataframe graph spark-graphx
1个回答
2
投票

要创建GraphX图,您需要从数据框中提取顶点并将它们与ID相关联。然后,您需要使用这些ID提取边(2个元组的顶点+元数据)。所有这些都需要在RDD中,而不是数据帧。

换句话说,你需要一个用于顶点的RDD[(VertexId, X)]和一个RDD[Edge(VertexId, VertexId, Y)],其中X是顶点元数据,Y是边缘元数据。请注意,VertexId只是Long的别名。

在您的情况下,顶点列为“S”和“O”,边列为“P”,它将如下所示。

// Let's create the vertex RDD.
val vertices : RDD[(VertexId, String)] = df
    .select(explode(array('S, 'O))) // S and O are the vertices
    .distinct // we remove duplicates
    .rdd.map(_.getAs[String](0)) // transform to RDD
    .zipWithIndex // associate a long index to each vertex
    .map(_.swap)

// Now let's define a vertex dataframe because joins are clearer in sparkSQL
val vertexDf = vertices.toDF("id", "node")

// And let's extract the edges and join their vertices with their respective IDs
val edges : RDD[Edge(VertexId, VertexId, String)] = df
    .join(vertexDf, df("S") === vertexDf("node")) // getting the IDs for "S"
    .select('P, 'O, 'id as 'idS)
    .join(vertexDf, df("O") === vertexDf("node")) // getting the IDs for "O"
    .rdd.map(row => // creating the edge using column "P" as metadata 
      Edge(row.getAs[Long]("idS"), row.getAs[Long]("id"), row.getAs[String]("P")))

// And finally
val graph = Graph(vertices, edges)
© www.soinside.com 2019 - 2024. All rights reserved.