我已经编写了一个最短路径 Pregel 应用程序,我试图通过 Bitnami Docker 将其分发到 Apache Spark 独立集群。在启动 Pregel 功能之前,脚本中的所有内容都可以正常运行。然后,什么也没有发生。我正在使用 Apache Spark 3.3.2 和 Scala 2.12.
我尝试运行的脚本来自 Spark 存储库,它一直运行到
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
行,那里没有任何反应。它不会停止,但会卡住。主日志是说“启动执行程序应用程序”“删除执行程序应用程序”“启动执行程序应用程序”等。工作日志只是设置、完成、清理然后再次重复。
import org.apache.spark.sql.SparkSession
/**
* An example use the Pregel operator to express computation
* such as single source shortest path
* Run with
* {{{
* bin/run-example graphx.SSSPExample
* }}}
*/
object SSSPExample {
def main(args: Array[String]): Unit = {
// Creates a SparkSession.
val spark = SparkSession
.builder
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
val sc = spark.sparkContext
// $example on$
// A graph with edge attributes containing distances
val graph: Graph[Long, Double] =
GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // The ultimate source
// Initialize the graph such that all vertices except the root have distance infinity.
val initialGraph = graph.mapVertices((id, _) =>
if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // Send Message
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a, b) => math.min(a, b) // Merge Message
)
println(sssp.vertices.collect.mkString("\n"))
// $example off$
spark.stop()
}
}
有人知道发生了什么事吗?我认为这很奇怪,因为 Pregel API 应该是可分发的。
我尝试在 Pregel 中单独运行各个操作(triplet、Iterator、math.min 等),它们能够在 Docker 集群上运行。