我正在对 Spark 结构化流 (Spark 3.0) 作业中的计算聚合进行原型设计,并将更新发布到 Kafka。我需要计算每个组的最大日期和所有时间的最大百分比(无窗口)。除了源流中的 Kafka 逻辑删除记录(删除)之外,代码看起来不错。流接收具有有效键和空值的 Kafka 记录,但最大聚合继续在计算中包含该记录。当从 Kafka 执行删除操作时,在不删除已删除记录的情况下重新计算的最佳选项是什么?
示例
产生的消息:
<"user1|1", {"user": "user1", "pct":30, "timestamp":"2021-01-01 01:00:00"}>
<"user1|2", {"user": "user1", "pct":40, "timestamp":"2021-01-01 02:00:00"}>
<"user1|2", null>
Spark 代码片段:
val usageStreamRaw = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", usageTopic)
.load()
val usageStream = usageStreamRaw
.select(
col("key").cast(StringType).as("key"),
from_json(col("value").cast(StringType), valueSchema).as("json")
)
.selectExpr("key", "json.*")
val usageAgg = usageStream
.groupBy("user")
.agg(
max("timestamp").as("maxTime"),
max("pct").as("maxPct")
)
val sq = usageAgg.writeStream
.outputMode("update")
.option("truncate", "false")
.format("console")
.start()
sq.awaitTermination()
对于 user1,
pct
列中的结果是 40,但删除后应该是 30。有没有好的方法可以使用 Spark 结构化流来做到这一点?
您可以通过以下方式在每条消息中使用 Kafka 时间戳:
val usageStream = usageStreamRaw
.select(col("key").cast(StringType).as("key"),
from_json(col("value").cast(StringType), valueSchema).as("json"),
col("timestamp"))
.selectExpr("key", "json.*", "timestamp")
然后
null
值