Spark watermark api 问题

问题描述 投票:0回答:0

对于 Structured streaming watermark is 1 hr set in api.

现在我在 Streaming Listener 中使用下面的这个 api:

**event: StreamingQueryListener.QueryProgressEvent**
triggerTime = Instant.parse(event.progress.timestamp)
watermarkTime = Instant.parse(event.progress.eventTime.getOrDefault("watermark", ""))

下面描述了 Spark 文档中的内容,但是当我在 SparkListener 中使用这个 api 时,当 Listener 被执行时,spark 正在到达另一批次

所以

triggerTime.getEpochSecond - watermarkTime.getEpochSecond -3600

Listener 中的上述代码给出了 -ve 答案,这意味着来自 eventTime 映射的水印显示的是最新事件,而不是触发发生的事件,因为 Listener 因 ListenerBus 队列大小而迟到。

逻辑上它应该是0作为第一项

triggerTime.getEpochSecond - watermarkTime.getEpochSecond
每个触发时间应该是3600然后我从中减少内容

但是由于 eventTime 地图给出了最新的水印,但触发时间是旧的,它产生的值小于 3600

当确实存在一些巨大的处理延迟时,它会上升 +ve 但在没有延迟的正常时间它仍然是负数

我想知道我的理解是否正确,它的发生是由于流监听器运行的延迟,因为没有其他值会使这个等式 -ve.

val eventTime: ju.Map[String, String]
spark-structured-streaming
© www.soinside.com 2019 - 2024. All rights reserved.