在graphX中,如何使用自定义PartitionStrategy对其拓扑进行分区?

问题描述 投票:1回答:1

我想使用图拓扑信息添加一个新的PartitionStrategy。不过,我发现PartitionStrategy仅具有以下功能。我找不到任何可以接收图形数据的函数。

  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    println("partitioning!")
    numParts
  }

,此功能只能获得一分钱的src-dst信息。

在spark graphx源org.apache.spark.graphx.impl.GraphImpl中,我找到如下代码,

  override def partitionBy(
      partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
    val edTag = classTag[ED]
    val vdTag = classTag[VD]
    val newEdges = edges.withPartitionsRDD(edges.map { e =>
      val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
      (part, (e.srcId, e.dstId, e.attr))
    }
      .partitionBy(new HashPartitioner(numPartitions))
      .mapPartitionsWithIndex(
        { (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
          val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
          iter.foreach { message =>
            val data = message._2
            builder.add(data._1, data._2, data._3)
          }
          val edgePartition = builder.toEdgePartition
          Iterator((pid, edgePartition))
        }, preservesPartitioning = true)).cache()
    GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
  }

.partitionBy(new HashPartitioner(numPartitions))如下,partitionBy来自PairRDDFunctions类,如下],>

  /**
   * Return a copy of the RDD partitioned using the specified partitioner.
   */
  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
    if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
    if (self.partitioner == Some(partitioner)) {
      self
    } else {
      new ShuffledRDD[K, V, V](self, partitioner)
    }
  }

HashPartitioner如下,

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

,但是这些功能无法获取图形数据。

我阅读了PowerGraph distributed_constrained_random_ingress.hpp和powerlyra distributed_hybrid_ingress.hpp代码,在预处理阶段,他们可以获取图形,因此可以使用图形拓扑信息。

我想利用图拓扑信息,但是我不知道如何在spark中添加新功能来获取图数据,然后为每个边赋予新的PartitionID

我想使用图拓扑信息添加一个新的PartitionStrategy。不过,我发现PartitionStrategy仅具有以下功能。我找不到任何可以接收图形的函数...

scala apache-spark spark-graphx
1个回答
0
投票

这里是一种方法:

© www.soinside.com 2019 - 2024. All rights reserved.