我有一个示例 Spark 结构化代码,我正在尝试实施/测试水印以说明迟到的数据。
不知何故,即使旧数据的时间戳大于 (max(event timestamp) - watermark)
,水印也会被忽略并发布旧数据这是代码:
schema = StructType([
StructField("temparature", LongType(), False),
StructField("ts", TimestampType(), False),
StructField("insert_ts", TimestampType(), False)
])
streamingDataFrame = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("group.id", 'watermark-grp') \
.option("subscribe", topic) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"), schema=schema).alias("parsed_value"))
resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \
, col("parsed_value.temparature").alias("temparature"), col("parsed_value.insert_ts").alias("insert_ts"))
resultM = resultC. \
withWatermark("timestamp", "10 minutes"). \
groupBy(window(resultC.timestamp, "10 minutes", "5 minutes")). \
agg({'temparature':'sum'})
resultMF = resultM. \
select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") \
, col("sum(temparature)").alias("Sum_Temperature"))
result = resultMF. \
writeStream. \
outputMode('update'). \
option("numRows", 1000). \
option("truncate", "false"). \
format('console'). \
option('checkpointLocation', checkpoint_path). \
queryName("sum_temparature"). \
start()
result.awaitTermination()
Kafka主题的数据输入:
+---------------------------------------------------------------------------------------------------+----+
|value |key |
+---------------------------------------------------------------------------------------------------+----+
|{"temparature":7,"insert_ts":"2023-03-16T15:32:35.160-07:00","ts":"2022-03-16T16:12:00.000-07:00"} |null|
|{"temparature":15,"insert_ts":"2023-03-16T15:33:24.933-07:00","ts":"2022-03-16T16:12:00.000-07:00"}|null|
|{"temparature":11,"insert_ts":"2023-03-16T15:37:36.844-07:00","ts":"2022-03-15T16:12:00.000-07:00"}|null|
|{"temparature":8,"insert_ts":"2023-03-16T15:41:33.312-07:00","ts":"2022-03-16T10:12:00.000-07:00"} |null|
|{"temparature":14,"insert_ts":"2023-03-16T15:42:27.627-07:00","ts":"2022-03-16T10:10:00.000-07:00"}|null|
|{"temparature":6,"insert_ts":"2023-03-16T15:44:44.508-07:00","ts":"2022-03-16T11:16:00.000-07:00"} |null|
|{"temparature":19,"insert_ts":"2023-03-16T15:46:15.486-07:00","ts":"2022-03-16T11:16:00.000-07:00"}|null|
|{"temparature":3,"insert_ts":"2023-03-16T16:10:15.676-07:00","ts":"2022-03-16T16:16:00.000-07:00"} |null|
|{"temparature":13,"insert_ts":"2023-03-16T16:11:52.194-07:00","ts":"2022-03-14T16:16:00.000-07:00"}|null|
+---------------------------------------------------------------------------------------------------+----+
结构化流的输出:
-------------------------------------------
Batch: 14
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-16 16:15:00|2022-03-16 16:25:00|3 |
|2022-03-16 16:10:00|2022-03-16 16:20:00|3 |
+-------------------+-------------------+---------------+
-------------------------------------------
Batch: 15
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-14 16:15:00|2022-03-14 16:25:00|13 |
|2022-03-14 16:10:00|2022-03-14 16:20:00|13 |
+-------------------+-------------------+---------------+
带“ts”的记录:“2022-03-14 16:16:00.000-07:00”是放入kafka主题的最后一条记录,这是2天前的记录。
根据我的理解 - 当“max(ts) - watermark > end time of the batch”时,批处理(窗口)将关闭。 max(ts) = 2022-03-16 16:12:00.00,所以在 max(ts) - watermark' 之前流入的任何数据都应该被忽略。
那么,我在这里做错了什么? 对此的任何意见表示赞赏。
蒂亚!
更新: 似乎可以保证水印内的记录将被处理。水印外的记录可能会也可能不会被处理。如果从状态存储中清除窗口,则不会处理迟到的记录,否则会处理它。
有什么方法可以“确保”(或强制)不处理水印后到达的迟到数据?
问题很简单(也有点傻)..水印列“时间戳”是保留关键字,将其更改为非保留名称,例如。 'ts' 解决了这个问题。 更改后水印工作正常!