如何使结构化流中的 dropDuplicates 状态过期以避免 OOM?

问题描述 投票:0回答:4

我想使用 Spark 结构化流计算每天的唯一访问次数,所以我使用以下代码

.dropDuplicates("uuid")

并且在第二天,今天维护的状态应该被删除,以便我可以获得第二天的唯一访问的正确计数并避免 OOM。 Spark文档指示使用带有水印的dropDuplicates,例如:

.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")

但是必须在dropDuplicates中指定水印列。在这种情况下,uuid 和时间戳将用作组合键来对具有相同 uuid 和时间戳的元素进行重复数据删除,这不是我所期望的。

那么有完美的解决方案吗?

apache-spark duplicates apache-spark-sql out-of-memory spark-structured-streaming
4个回答
11
投票

经过几天的努力我终于自己找到了方法。

在研究watermarkdropDuplicates的源代码时,我发现watermark除了eventTime列之外,还支持window列,所以我们可以使用以下代码:

.select(
    window($"timestamp", "1 day"),
    $"timestamp",
    $"uuid"
  )
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")

由于同一天的所有事件都具有相同的窗口,因此这将产生与仅使用 uuid 进行重复数据删除相同的结果。希望可以帮助别人。


0
投票

以下是对 Spark 文档中建议的程序的修改。技巧是操纵事件时间,即将事件时间放入 桶。假设事件时间以毫秒为单位提供。

// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df) {
    // converts time in 15 minute buckets
    // timestamp - (timestamp % (15 * 60))
    Column bucketCol = functions.to_timestamp(
            col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
    df = df.withColumn("bucket", bucketCol);

    String windowDuration = "15 minutes";
    df = df.withWatermark("bucket", windowDuration)
            .dropDuplicates("uuid", "bucket");

    return df.drop("bucket");
}

0
投票

我发现窗口功能不起作用,所以我选择使用window.start或window.end。

.select(
   window($"timestamp", "1 day").start,
   $"timestamp",
   $"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")

0
投票

这个问题很老了!然而,对于现在面临类似问题的人来说,spark 最新版本为这种情况提供了 dropDuplicatesWithinWatermark 选项:

streamingDf = spark.readStream. ...

# deduplicate using guid column with watermark based on eventTime column
streamingDf \
  .withWatermark("eventTime", "10 hours") \
  .dropDuplicatesWithinWatermark("guid")

dropDuplicates 中不需要指定水印列。 参考:https://spark.apache.org/docs/latest/structed-streaming-programming-guide.html#streaming-deduplication

© www.soinside.com 2019 - 2024. All rights reserved.