Kafka 删除(逻辑删除)未更新 Spark 结构化流中的最大聚合

问题描述 投票:0回答:1

我正在对 Spark 结构化流 (Spark 3.0) 作业中的计算聚合进行原型设计,并将更新发布到 Kafka。我需要计算每个组的最大日期和所有时间的最大百分比(无窗口)。除了源流中的 Kafka 逻辑删除记录(删除)之外,代码看起来不错。流接收具有有效键和空值的 Kafka 记录,但最大聚合继续在计算中包含该记录。当从 Kafka 执行删除操作时,在不删除已删除记录的情况下重新计算的最佳选项是什么?

示例
产生的消息:

<"user1|1", {"user": "user1", "pct":30, "timestamp":"2021-01-01 01:00:00"}>  
<"user1|2", {"user": "user1", "pct":40, "timestamp":"2021-01-01 02:00:00"}>  
<"user1|2", null>

Spark 代码片段:

val usageStreamRaw = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", bootstrapServers)
  .option("subscribe", usageTopic)
  .load()

val usageStream = usageStreamRaw
  .select(
    col("key").cast(StringType).as("key"),
    from_json(col("value").cast(StringType), valueSchema).as("json")
  )
  .selectExpr("key", "json.*")

val usageAgg = usageStream
  .groupBy("user")
  .agg(
    max("timestamp").as("maxTime"),
    max("pct").as("maxPct")
  )

val sq = usageAgg.writeStream
  .outputMode("update")
  .option("truncate", "false")
  .format("console")
  .start()

sq.awaitTermination()

对于 user1,

pct
列中的结果是 40,但删除后应该是 30。有没有好的方法可以使用 Spark 结构化流来做到这一点?

apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

您可以通过以下方式在每条消息中使用 Kafka 时间戳:

val usageStream = usageStreamRaw .select(col("key").cast(StringType).as("key"), from_json(col("value").cast(StringType), valueSchema).as("json"), col("timestamp")) .selectExpr("key", "json.*", "timestamp")
然后

    仅选择每个键的最新值,并且
  • 过滤掉
  • null
在对最大时间和百分比应用聚合之前。

© www.soinside.com 2019 - 2024. All rights reserved.