Spark Structured Streaming - 忽略水印,输出旧数据

问题描述 投票:0回答:1

我有一个示例 Spark 结构化代码,我正在尝试实施/测试水印以说明迟到的数据。

不知何故,即使旧数据的时间戳大于 (max(event timestamp) - watermark)

,水印也会被忽略并发布旧数据

这是代码:

schema = StructType([
            StructField("temparature", LongType(), False),
            StructField("ts", TimestampType(), False),
            StructField("insert_ts", TimestampType(), False)
        ])


streamingDataFrame = spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", kafkaBrokers) \
                .option("group.id", 'watermark-grp') \
                .option("subscribe", topic) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema=schema).alias("parsed_value"))

resultC = streamingDataFrame.select( col("parsed_value.ts").alias("timestamp") \
                   , col("parsed_value.temparature").alias("temparature"), col("parsed_value.insert_ts").alias("insert_ts"))



resultM = resultC. \
    withWatermark("timestamp", "10 minutes"). \
    groupBy(window(resultC.timestamp, "10 minutes", "5 minutes")). \
    agg({'temparature':'sum'})

resultMF = resultM. \
            select(col("window.start").alias("startOfWindowFrame"),col("window.end").alias("endOfWindowFrame") \
                          , col("sum(temparature)").alias("Sum_Temperature"))

result = resultMF. \
                     writeStream. \
                     outputMode('update'). \
                     option("numRows", 1000). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("sum_temparature"). \
                     start()

result.awaitTermination()

Kafka主题的数据输入:

+---------------------------------------------------------------------------------------------------+----+
|value                                                                                              |key |
+---------------------------------------------------------------------------------------------------+----+
|{"temparature":7,"insert_ts":"2023-03-16T15:32:35.160-07:00","ts":"2022-03-16T16:12:00.000-07:00"} |null|
|{"temparature":15,"insert_ts":"2023-03-16T15:33:24.933-07:00","ts":"2022-03-16T16:12:00.000-07:00"}|null|
|{"temparature":11,"insert_ts":"2023-03-16T15:37:36.844-07:00","ts":"2022-03-15T16:12:00.000-07:00"}|null|
|{"temparature":8,"insert_ts":"2023-03-16T15:41:33.312-07:00","ts":"2022-03-16T10:12:00.000-07:00"} |null|
|{"temparature":14,"insert_ts":"2023-03-16T15:42:27.627-07:00","ts":"2022-03-16T10:10:00.000-07:00"}|null|
|{"temparature":6,"insert_ts":"2023-03-16T15:44:44.508-07:00","ts":"2022-03-16T11:16:00.000-07:00"} |null|
|{"temparature":19,"insert_ts":"2023-03-16T15:46:15.486-07:00","ts":"2022-03-16T11:16:00.000-07:00"}|null|
|{"temparature":3,"insert_ts":"2023-03-16T16:10:15.676-07:00","ts":"2022-03-16T16:16:00.000-07:00"} |null|
|{"temparature":13,"insert_ts":"2023-03-16T16:11:52.194-07:00","ts":"2022-03-14T16:16:00.000-07:00"}|null|
+---------------------------------------------------------------------------------------------------+----+

结构化流的输出:

-------------------------------------------
Batch: 14
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-16 16:15:00|2022-03-16 16:25:00|3              |
|2022-03-16 16:10:00|2022-03-16 16:20:00|3              |
+-------------------+-------------------+---------------+

-------------------------------------------
Batch: 15
-------------------------------------------
+-------------------+-------------------+---------------+
|startOfWindowFrame |endOfWindowFrame   |Sum_Temperature|
+-------------------+-------------------+---------------+
|2022-03-14 16:15:00|2022-03-14 16:25:00|13             |
|2022-03-14 16:10:00|2022-03-14 16:20:00|13             |
+-------------------+-------------------+---------------+

带“ts”的记录:“2022-03-14 16:16:00.000-07:00”是放入kafka主题的最后一条记录,这是2天前的记录。

根据我的理解 - 当“max(ts) - watermark > end time of the batch”时,批处理(窗口)将关闭。 max(ts) = 2022-03-16 16:12:00.00,所以在 max(ts) - watermark' 之前流入的任何数据都应该被忽略。

那么,我在这里做错了什么? 对此的任何意见表示赞赏。

蒂亚!

更新: 似乎可以保证水印内的记录将被处理。水印外的记录可能会也可能不会被处理。如果从状态存储中清除窗口,则不会处理迟到的记录,否则会处理它。

有什么方法可以“确保”(或强制)不处理水印后到达的迟到数据?

apache-spark spark-structured-streaming watermark
1个回答
0
投票

问题很简单(也有点傻)..水印列“时间戳”是保留关键字,将其更改为非保留名称,例如。 'ts' 解决了这个问题。 更改后水印工作正常!

© www.soinside.com 2019 - 2024. All rights reserved.