我正在从 Kafka 读取数据(最早从 Offsets 开始),并将数据写入控制台进行一些测试。水印持续时间为 10 秒。遵循 Spark 文档 - https://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#streaming-deduplication
result_df = parsed_df\
.withWatermark("event_time", "10 seconds")\
.dropDuplicates(["uuid"])
result_df\
.writeStream\
.option("checkpointLocation", "gs://test/checkpoint_ts/")\
.format("console")\
.start()
将下面的记录一一传递到kafka并从spark结构化流中读取每条记录:忽略下面的#开始
{"uuid":10150,"event_time":"2023-08-07T08:00:00.004071876Z"} # 通过,显示在控制台
{"uuid":10151,"event_time":"2023-08-07T09:00:00.004071876Z"} # 通过,显示在控制台
{"uuid":10152,"event_time":"2023-08-07T10:00:00.004071876Z"} # 通过,显示在控制台
{"uuid":10150,"event_time":"2023-08-07T11:00:00.004071876Z"} # 已丢弃,不会显示在控制台上(这是预期的,现在给定的水印阈值是 2023-08-07T10:00: 00 - 10 秒,所以这条记录不应该通过吗?)
{"uuid":10153,"event_time":"2023-08-07T06:00:00.004071876Z"} # 已丢弃,预计 event_time 早于水印阈值
问题:读取uuid:10150的第4条记录时,水印阈值是2023-08-07T10:00:00 - 10秒。由于具有相同 uuid:10150 且 event_time 不同的第一条记录早于当前水印阈值,因此通过给定 Spark 流状态存储的第四条记录是否不应该在内存中保留第一条记录,以便 Spark 根据 uuid 检测到该记录为重复项?
dropDuplicates
运算符有点微妙。以下是它工作的两种“模式”:
在您的情况下,您只传递了“值”列,即非事件时间列。这是“模式”1:
result_df = parsed_df\
.withWatermark("event_time", "10 seconds")\
.dropDuplicates(["uuid"]) # <--- event_time not being passed in!
因此,
dropDuplicates
将无限期地保留状态。因此,示例中的记录 0、1、2 和 4 将被允许通过,但记录 3 将被重复数据删除,因为其 UUID 10150
存在于记录 0 中。
真的,你打算做第二种“模式”:
result_df = parsed_df\
.withWatermark("event_time", "10 seconds")\
.dropDuplicates(["uuid", "event_time"])
让我们考虑一下您的示例,假设您这样做了。我假设每条记录都在自己的批次中进行处理,以便水印更新每条记录(结构化流仅更新每批次的水印)。
StreamingQueryProgress
numRowsDroppedByWatermark
是 1。
withWatermark
dropDuplicates
操作员。事件时间小于水印的记录将被删除。