跳过的阶段对Spark作业有什么性能影响吗?

问题描述 投票:0回答:1

我正在运行一个Spark结构化流式作业,包括创建一个空的数据框架,并通过每个微批处理更新它,如下图所示。为了避免重新计算,我在循环内每次更新后都将更新后的StaticDF持久化到内存中,这有助于跳过那些额外的阶段。这有助于跳过每一个新的微批处理所产生的额外阶段。

我的问题

1) 即使完成的总阶段数保持不变,因为增加的阶段总是被跳过,但这是否会导致性能问题,因为在同一时间点上可能有数百万个跳过的阶段? 2) 当部分或全部缓存的RDD不可用时,会发生什么?nodeexecutor故障)。Spark文档中说,到目前为止,它并没有将从多个微批中接收到的全部数据实体化,那么是否意味着它需要再次从Kafka读取所有事件来重新生成staticDF?

// one time creation of empty static(not streaming) dataframe
val staticDF_schema = new StructType()
      .add("product_id", LongType)
      .add("created_at", LongType)
var staticDF = sparkSession
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], staticDF_schema)

// Note : streamingDF was created from Kafka source
    streamingDF.writeStream
      .trigger(Trigger.ProcessingTime(10000L))
      .foreachBatch {
        (micro_batch_DF: DataFrame) => {

        // fetching max created_at for each product_id in current micro-batch
          val staging_df = micro_batch_DF.groupBy("product_id")
            .agg(max("created").alias("created"))

          // Updating staticDF using current micro batch
          staticDF = staticDF.unionByName(staging_df)
          staticDF = staticDF
            .withColumn("rnk",
              row_number().over(Window.partitionBy("product_id").orderBy(desc("created_at")))
            ).filter("rnk = 1")
            .drop("rnk")
              .cache()

          }

enter image description here

scala apache-spark spark-streaming spark-structured-streaming spark-streaming-kafka
1个回答
0
投票

即使跳过的阶段不需要任何计算,但我的工作在一定数量的批次后开始失败。这是因为DAG随着每一个批次的执行而增长,使得它无法管理,并抛出堆栈溢出异常。

为了避免这种情况,我不得不打破火花系,使阶段数不会随着每次运行而增加(即使跳过了阶段)。

© www.soinside.com 2019 - 2024. All rights reserved.