Spark 流 - withWatermark() 具有重复行为

问题描述 投票:0回答:1

我正在从 Kafka 读取数据(最早从 Offsets 开始),并将数据写入控制台进行一些测试。水印持续时间为 10 秒。遵循 Spark 文档 - https://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#streaming-deduplication

result_df = parsed_df\
    .withWatermark("event_time", "10 seconds")\
    .dropDuplicates(["uuid"])

result_df\
    .writeStream\
    .option("checkpointLocation", "gs://test/checkpoint_ts/")\
    .format("console")\
    .start()

将下面的记录一一传递到kafka并从spark结构化流中读取每条记录:忽略下面的#开始

{"uuid":10150,"event_time":"2023-08-07T08:00:00.004071876Z"} # 通过,显示在控制台

{"uuid":10151,"event_time":"2023-08-07T09:00:00.004071876Z"} # 通过,显示在控制台

{"uuid":10152,"event_time":"2023-08-07T10:00:00.004071876Z"} # 通过,显示在控制台

{"uuid":10150,"event_time":"2023-08-07T11:00:00.004071876Z"} # 已丢弃,不会显示在控制台上(这是预期的,现在给定的水印阈值是 2023-08-07T10:00: 00 - 10 秒,所以这条记录不应该通过吗?)

{"uuid":10153,"event_time":"2023-08-07T06:00:00.004071876Z"} # 已丢弃,预计 event_time 早于水印阈值

问题:读取uuid:10150的第4条记录时,水印阈值是2023-08-07T10:00:00 - 10秒。由于具有相同 uuid:10150 且 event_time 不同的第一条记录早于当前水印阈值,因此通过给定 Spark 流状态存储的第四条记录是否不应该在内存中保留第一条记录,以便 Spark 根据 uuid 检测到该记录为重复项?

apache-spark spark-streaming spark-structured-streaming
1个回答
0
投票

dropDuplicates
运算符有点微妙。以下是它工作的两种“模式”:

  1. 如果您不传入事件时间列,它将无限期地保持状态。在这种情况下,它会执行适当的“全局”重复数据删除,因为它会记住它看到的所有内容。
  2. 如果您传入事件时间列,它将清除小于水印的状态。它还使用您提供的所有列执行重复数据删除。

你做了什么

在您的情况下,您只传递了“值”列,即非事件时间列。这是“模式”1:

result_df = parsed_df\
    .withWatermark("event_time", "10 seconds")\
    .dropDuplicates(["uuid"]) # <--- event_time not being passed in!

因此,

dropDuplicates
将无限期地保留状态。因此,示例中的记录 0、1、2 和 4 将被允许通过,但记录 3 将被重复数据删除,因为其 UUID
10150
存在于记录 0 中。

你想做什么

真的,你打算做第二种“模式”:

result_df = parsed_df\
    .withWatermark("event_time", "10 seconds")\
    .dropDuplicates(["uuid", "event_time"])

让我们考虑一下您的示例,假设您这样做了。我假设每条记录都在自己的批次中进行处理,以便水印更新每条记录(结构化流仅更新每批次的水印)。

  1. 第一条记录被处理。 (10150,8 小时)被添加到状态中。水印更新为 07:59:50。我们发出记录,因为它是唯一的。
  2. 第二条记录已处理。 (10151,9 小时)添加到状态,水印更新为 08:59:50。我们发出记录。 由于水印现在大于状态中的另一条记录(10150,8 小时)的事件时间,因此我们从状态中删除(10150,8 小时)。现在,状态中唯一的东西是(10151,9小时)。
  3. 同样的事情。 (10152,10小时)被添加到状态,水印更新到09:59:50,我们发出记录。由于水印大于状态中的另一条记录(10151, 9 小时)的事件时间,因此我们从状态中删除(10151, 9 小时)。现在,状态中唯一的东西是(10152,10小时)。
  4. 现在,有一些微妙之处:下一条记录的 uuid 10150 不是全局唯一的,但由于我们由于水印而丢弃了状态,操作员不知道它之前是否见过 10150。结果,我们实际上发出了这个记录。 (10150, 11 小时) 添加到状态,水印更新为 10:59:50,我们从状态中删除 (10152, 10 小时)。 此时水印为10:59:50。由于最后一条记录的时间戳小于水印,因此我们将其删除。在您的
  5. StreamingQueryProgress
  6. 中,您会看到
    numRowsDroppedByWatermark
    是 1。
    
    
  7. 结论

如果您的流上有
    withWatermark
  • 调用,您确实应该将正在使用的事件时间列的名称传递给
    dropDuplicates
    操作员。
    事件时间小于水印的记录将被删除。
  • 将发出不是全局唯一的记录,但由于其重复项已从状态存储中删除而具有唯一性。这可能会导致下游重复,特别是如果您的水印延迟太小。
  • 如果您
  • 真的
  • 无法容忍重复,请设置较大的水印延迟或进行真正的无水印重复数据删除。
© www.soinside.com 2019 - 2024. All rights reserved.