我正在运行一个Spark结构化流式作业,包括创建一个空的数据框架,并通过每个微批处理更新它,如下图所示。为了避免重新计算,我在循环内每次更新后都将更新后的StaticDF持久化到内存中,这有助于跳过那些额外的阶段。这有助于跳过每一个新的微批处理所产生的额外阶段。
我的问题
1) 即使完成的总阶段数保持不变,因为增加的阶段总是被跳过,但这是否会导致性能问题,因为在同一时间点上可能有数百万个跳过的阶段? 2) 当部分或全部缓存的RDD不可用时,会发生什么?nodeexecutor故障)。Spark文档中说,到目前为止,它并没有将从多个微批中接收到的全部数据实体化,那么是否意味着它需要再次从Kafka读取所有事件来重新生成staticDF?
// one time creation of empty static(not streaming) dataframe
val staticDF_schema = new StructType()
.add("product_id", LongType)
.add("created_at", LongType)
var staticDF = sparkSession
.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], staticDF_schema)
// Note : streamingDF was created from Kafka source
streamingDF.writeStream
.trigger(Trigger.ProcessingTime(10000L))
.foreachBatch {
(micro_batch_DF: DataFrame) => {
// fetching max created_at for each product_id in current micro-batch
val staging_df = micro_batch_DF.groupBy("product_id")
.agg(max("created").alias("created"))
// Updating staticDF using current micro batch
staticDF = staticDF.unionByName(staging_df)
staticDF = staticDF
.withColumn("rnk",
row_number().over(Window.partitionBy("product_id").orderBy(desc("created_at")))
).filter("rnk = 1")
.drop("rnk")
.cache()
}
即使跳过的阶段不需要任何计算,但我的工作在一定数量的批次后开始失败。这是因为DAG随着每一个批次的执行而增长,使得它无法管理,并抛出堆栈溢出异常。
为了避免这种情况,我不得不打破火花系,使阶段数不会随着每次运行而增加(即使跳过了阶段)。