PySpark 结构化流每批 2 个 SQL(长 addBatch 执行)

问题描述 投票:0回答:1

我有一个 Pyspark 结构化流应用程序 (3.3.2),它需要使用微批次从 Kafka 读取输入,执行复杂的逻辑,其中包括连接来自几个数据帧的数据。 该应用程序分为 2 个流式查询:

  1. 加载计算所需的数据帧 - 这是半静态数据(每天更改几次) - 出于性能原因,数据被缓存以与流查询 2 共享
  2. 执行逻辑本身 - 使用 foreachBatch,执行逻辑。注意:这个计划相当大。 问题是,每个微批次的流查询 2,我看到 2 个 SQL 正在运行,具有相同的 batch_id 和 run_id - 第一个需要 20 多秒,我不太明白它的目的是什么,第二个运行得相当快,看起来就像实际的查询执行一样。

有人可以解释一下为什么有 2 个 SQL 正在运行吗?为什么第一个需要 20 多秒?

我的直觉是,由于计划相当大,因此需要时间来处理计划(尽管我预计计划会“编译”一次,而不是重新评估每个微批次)。

我尝试了以下方法:

  • 将第二个流查询(逻辑)拆分为更小的流查询(具有更小的计划),将 Kafka 主题放在它们之间 - 我仍然看到每批有 2 个 SQL。
  • 使用本地文件系统作为检查点,而不是 GCS 中的路径(仅用于测试目的),这并没有减少第一个 SQL 的延迟
  • 关闭
    spark.sql.cbo.enabled
    功能标志

有几件事要提:

  • 当分解为较小的流查询时,“小计划”(只有很少的转换)流查询的第一个 SQL 运行时间较短(约 2 秒,仍然相当多)
  • 2 个 SQL 部分并行运行,在第一个 SQL 执行期间的某个时刻,第二个 SQL 开始并行运行。

See screenshot

apache-spark pyspark spark-structured-streaming
1个回答
0
投票

这里“可能”发生的是,您正在运行带有水印的有状态查询;也就是说,使用具有重复数据删除、聚合、联接或 (Flat)MapGroupsWithState 的水印。 如果您的查询属于这种类型,那么发生这种情况的原因如下:由于结构化流 (SS) 实现统计运算符(相对于水印)的方式,会发生两批。在批处理中处理完所有数据后,结构化流按以下顺序执行两件事:

它使用
    current
  1. 水印来确定哪些元素可以从状态中删除。 然后,它会通过获取当前批次的最大事件时间并减去
  2. .withWatermark
  3. 调用中给出的延迟来更新要在下一批中使用的水印。
    
    
  4. 问题在于,结构化流可能具有无法被当前水印删除的状态的记录,但
可以

被更新的水印删除。结果,它运行另一个“无数据”批次,只是将步骤 2 中的水印应用到状态记录。这实际上是一个配置,它很好地记录在源代码中 一个例子

让我们考虑一个 5 分钟的翻滚聚合(即窗口是 0-5、5-10 等)和 10 分钟的水印。假设我们只是计算每个窗口中的元素数量。

在我们的第一批中,我们的水印从 0 开始;假设我们在时间 12 和 13 收到两条记录。处理它们后,我们的内部状态如下所示:

    [10, 15] -> 2 records
  • 
    
  • 我们的
current

水印是 0,不大于 15,因此该记录不会向下游发出。然后,水印更新为 13 - 10,即 3。无数据批次运行,但由于 3 仍小于 15,因此不会发出任何数据。 因此,无数据批处理并没有多大帮助,但让我们考虑一下当我们收到更多记录时会发生什么:14、26、28。处理它们后,我们的状态如下所示:

    [10, 15] -> 3 records
  • [25, 30] -> 2 records
  • 
    
  • 我们的
current

水印仍然是3,不大于15,所以什么也没有发出。然后,水印更新为 28 - 10 = 18。然后,结构化流运行其无数据批次。由于 18 大于结束时间戳 15,因此 [10, 15] -> 3 records 记录

is
向下游发出。然后,我们的状态看起来像:

    [25, 30] -> 2 records
  • 
    
  • 因此,无数据批处理很有帮助,因为它会尽快发出结果(在结构化流的水印架构内)。请注意,此行为适用于
任何

有状态运算符,而不仅仅是聚合。

© www.soinside.com 2019 - 2024. All rights reserved.