我有一个 Pyspark 结构化流应用程序 (3.3.2),它需要使用微批次从 Kafka 读取输入,执行复杂的逻辑,其中包括连接来自几个数据帧的数据。 该应用程序分为 2 个流式查询:
有人可以解释一下为什么有 2 个 SQL 正在运行吗?为什么第一个需要 20 多秒?
我的直觉是,由于计划相当大,因此需要时间来处理计划(尽管我预计计划会“编译”一次,而不是重新评估每个微批次)。
我尝试了以下方法:
spark.sql.cbo.enabled
功能标志有几件事要提:
这里“可能”发生的是,您正在运行带有水印的有状态查询;也就是说,使用具有重复数据删除、聚合、联接或 (Flat)MapGroupsWithState 的水印。 如果您的查询属于这种类型,那么发生这种情况的原因如下:由于结构化流 (SS) 实现统计运算符(相对于水印)的方式,会发生两批。在批处理中处理完所有数据后,结构化流按以下顺序执行两件事:
它使用
.withWatermark
被更新的水印删除。结果,它运行另一个“无数据”批次,只是将步骤 2 中的水印应用到状态记录。这实际上是一个配置,它很好地记录在源代码中。 一个例子
在我们的第一批中,我们的水印从 0 开始;假设我们在时间 12 和 13 收到两条记录。处理它们后,我们的内部状态如下所示:
[10, 15] -> 2 records
水印是 0,不大于 15,因此该记录不会向下游发出。然后,水印更新为 13 - 10,即 3。无数据批次运行,但由于 3 仍小于 15,因此不会发出任何数据。 因此,无数据批处理并没有多大帮助,但让我们考虑一下当我们收到更多记录时会发生什么:14、26、28。处理它们后,我们的状态如下所示:
[10, 15] -> 3 records
[25, 30] -> 2 records
水印仍然是3,不大于15,所以什么也没有发出。然后,水印更新为 28 - 10 = 18。然后,结构化流运行其无数据批次。由于 18 大于结束时间戳 15,因此 [10, 15] -> 3 records
记录
is向下游发出。然后,我们的状态看起来像:
[25, 30] -> 2 records
有状态运算符,而不仅仅是聚合。