可以在完成输出模式下的Spark结构化流中丢弃/控制中间状态吗? (Spark 2.4.0)

问题描述 投票:0回答:1

我有一种情况,我想处理来自kafka主题的数据。我有这个特定的Java代码,可以从kafka主题中以流的形式读取数据。

Dataset<Row> streamObjs = sparkSession.readStream().format("kafka")
                .option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", streamTopic)
                .option("failOnDataLoss", false).load();

[将其强制转换为String,定义架构,然后尝试使用水印(用于后期数据)和窗口(用于分组和聚合),最后输出到kafka接收器。

Dataset<Row> selectExprImporter = streamObjs.selectExpr("CAST(value AS STRING)");

StructType streamSchema = new StructType().add("id", DataTypes.StringType)
                .add("timestamp", DataTypes.LongType)
                .add("values", new MapType(DataTypes.StringType, DataTypes.DoubleType, false));

Dataset<Row> selectValueImporter = selectExprImporter
                .select(functions.from_json(new Column("value"), streamSchema ).alias("data"));
.
.
(More transformations/operations)
.
.

Dataset<Row> aggCount_15min = streamData.withWatermark("timestamp", "2 minute")
                .withColumn("frequency", functions.lit(15))
                .groupBy(new Column("id"), new Column("frequency"),
                        functions.window(new Column("timestamp"), "15 minute").as("time_range"))
                .agg(functions.mean("value").as("mean_value"), functions.sum("value").as("sum"),
                        functions.count(functions.lit(1)).as("number_of_values"))
                .filter("mean_value > 35").orderBy("id", "frequency", "time_range");

aggCount_15min.selectExpr("to_json(struct(*)) AS value").writeStream()
                .outputMode(OutputMode.Complete()).format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
                .option("topic", outputTopic).option("checkpointLocation", checkpointLocation).start().awaitTermination();

问题

  1. 我是正确的理解,在kafka接收器中使用Complete Output mode时,中间状态将一直持续增加,直到我收到OutOfMemory异常?

  2. 另外,完全输出模式的理想用例是什么?仅在中间数据/状态不增加时使用它吗?

  3. 我需要使用
  4. Complete Output模式,因为我想使用orderBy子句。有什么办法可以让我在每说30分钟后强制spark降低其状态并再次使用新数据?

  5. 是否有更好的方法不使用“完整输出”模式,但仍能获得所需的结果?除了星型结构的流媒体,我还应该使用其他内容吗?

所需的结果是按照上面的查询聚合和分组数据,然后在创建第一个批次后,删除所有状态并为下一个批次重新开始。这里批处理可以是上​​次处理的时间戳的函数。比如说放弃所有状态,并在当前时间戳距第一个接收到的时间戳超过20分钟时重新开始,或者如果它是窗口时间的函数(在本示例中为15分钟)更好,则说更好,例如说当处理了4个批次的15分钟窗口并为第5批到达前4批的下降状态,并从该批重新开始。

apache-spark apache-spark-sql spark-structured-streaming
1个回答
0
投票

回答您的问题和标题问题:

  1. 是,您是正确的。
  2. 用于所有数据的汇总。第二部分不合逻辑,我无法回答。
  3. 摘自手册:只有在聚合之后并在“完整输出模式”下,流数据集才支持排序操作。您无法删除状态,需要正常停止Streaming App并重新启动。
  4. 否,因为您有许多当前实施无法满足的要求。除非您下达命令并执行窗口操作(15,15),否则内存不能正确使用。

https://www.signifytechnology.com/blog/2019/10/windowing-kafka-streams-using-spark-structured-streaming-by-david-virgil-naranjo提供了一个很好的例子。

© www.soinside.com 2019 - 2024. All rights reserved.