我有一种情况,我想处理来自kafka主题的数据。我有这个特定的Java代码,可以从kafka主题中以流的形式读取数据。
Dataset<Row> streamObjs = sparkSession.readStream().format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", streamTopic)
.option("failOnDataLoss", false).load();
[将其强制转换为String,定义架构,然后尝试使用水印(用于后期数据)和窗口(用于分组和聚合),最后输出到kafka接收器。
Dataset<Row> selectExprImporter = streamObjs.selectExpr("CAST(value AS STRING)");
StructType streamSchema = new StructType().add("id", DataTypes.StringType)
.add("timestamp", DataTypes.LongType)
.add("values", new MapType(DataTypes.StringType, DataTypes.DoubleType, false));
Dataset<Row> selectValueImporter = selectExprImporter
.select(functions.from_json(new Column("value"), streamSchema ).alias("data"));
.
.
(More transformations/operations)
.
.
Dataset<Row> aggCount_15min = streamData.withWatermark("timestamp", "2 minute")
.withColumn("frequency", functions.lit(15))
.groupBy(new Column("id"), new Column("frequency"),
functions.window(new Column("timestamp"), "15 minute").as("time_range"))
.agg(functions.mean("value").as("mean_value"), functions.sum("value").as("sum"),
functions.count(functions.lit(1)).as("number_of_values"))
.filter("mean_value > 35").orderBy("id", "frequency", "time_range");
aggCount_15min.selectExpr("to_json(struct(*)) AS value").writeStream()
.outputMode(OutputMode.Complete()).format("kafka").option("kafka.bootstrap.servers", bootstrapServers)
.option("topic", outputTopic).option("checkpointLocation", checkpointLocation).start().awaitTermination();
问题
我是正确的理解,在kafka接收器中使用Complete Output mode时,中间状态将一直持续增加,直到我收到OutOfMemory异常?
另外,完全输出模式的理想用例是什么?仅在中间数据/状态不增加时使用它吗?
Complete Output模式,因为我想使用orderBy子句。有什么办法可以让我在每说30分钟后强制spark降低其状态并再次使用新数据?
是否有更好的方法不使用“完整输出”模式,但仍能获得所需的结果?除了星型结构的流媒体,我还应该使用其他内容吗?
所需的结果是按照上面的查询聚合和分组数据,然后在创建第一个批次后,删除所有状态并为下一个批次重新开始。这里批处理可以是上次处理的时间戳的函数。比如说放弃所有状态,并在当前时间戳距第一个接收到的时间戳超过20分钟时重新开始,或者如果它是窗口时间的函数(在本示例中为15分钟)更好,则说更好,例如说当处理了4个批次的15分钟窗口并为第5批到达前4批的下降状态,并从该批重新开始。
回答您的问题和标题问题: