我想使用图拓扑信息添加一个新的PartitionStrategy。不过,我发现PartitionStrategy仅具有以下功能。我找不到任何可以接收图形数据的函数。
override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
println("partitioning!")
numParts
}
,此功能只能获得一分钱的src-dst信息。
在spark graphx源org.apache.spark.graphx.impl.GraphImpl
中,我找到如下代码,
override def partitionBy(
partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = {
val edTag = classTag[ED]
val vdTag = classTag[VD]
val newEdges = edges.withPartitionsRDD(edges.map { e =>
val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
(part, (e.srcId, e.dstId, e.attr))
}
.partitionBy(new HashPartitioner(numPartitions))
.mapPartitionsWithIndex(
{ (pid: Int, iter: Iterator[(PartitionID, (VertexId, VertexId, ED))]) =>
val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)
iter.foreach { message =>
val data = message._2
builder.add(data._1, data._2, data._3)
}
val edgePartition = builder.toEdgePartition
Iterator((pid, edgePartition))
}, preservesPartitioning = true)).cache()
GraphImpl.fromExistingRDDs(vertices.withEdges(newEdges), newEdges)
}
,.partitionBy(new HashPartitioner(numPartitions))
如下,partitionBy
来自PairRDDFunctions
类,如下],>
/** * Return a copy of the RDD partitioned using the specified partitioner. */ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope { if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("HashPartitioner cannot partition array keys.") } if (self.partitioner == Some(partitioner)) { self } else { new ShuffledRDD[K, V, V](self, partitioner) } }
HashPartitioner
如下,
/** * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using * Java's `Object.hashCode`. * * Java arrays have hashCodes that are based on the arrays' identities rather than their contents, * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will * produce an unexpected or incorrect result. */ class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions def getPartition(key: Any): Int = key match { case null => 0 case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) } override def equals(other: Any): Boolean = other match { case h: HashPartitioner => h.numPartitions == numPartitions case _ => false } override def hashCode: Int = numPartitions }
,但是这些功能无法获取图形数据。
我阅读了PowerGraph distributed_constrained_random_ingress.hpp和powerlyra distributed_hybrid_ingress.hpp代码,在预处理阶段,他们可以获取图形,因此可以使用图形拓扑信息。
我想利用图拓扑信息,但是我不知道如何在spark中添加新功能来获取图数据,然后为每个边赋予新的PartitionID
。
我想使用图拓扑信息添加一个新的PartitionStrategy。不过,我发现PartitionStrategy仅具有以下功能。我找不到任何可以接收图形的函数...
这里是一种方法: