Spark结构化流2.3.0中的水印

问题描述 投票:1回答:1

我在Spark Structured Streaming 2.3.0中从Kafka读取数据。数据包含有关某些教师的信息,其中包括TeacherId,teacherName和TeacherGroupsIds。 TeacherGroupsIds是一个数组列,其中包含该组的ID。在我的任务中,我必须将具有组ID的列映射到包含有关组名称的信息的列([1,2,3] => [Suns,Books,Flowers])。名称和ID存储在HBase中,并且每天都可以更改。稍后,我必须将数据发送到另一个Kafka主题。

因此,我从两个来源读取数据-Kafka和HBase。我使用shc库从HBase读取数据。

首先,我分解数组列(组ID),然后加入HBase的数据。

下一步,我想使用TeacherId汇总数据。但是我使用的附加模式不支持此操作。

我已经尝试过加水印,但目前不起作用。我添加了带有时间戳的新列,并按此列分组。

Dataset<Row> inputDataset = //reading from Kafka

Dataset<Row> explodedDataset = // explode function applied and join with HBase

Dataset<Row> outputDataset = explodedDataset
.withColumn("eventTime", lit(current_timestamp()))
.withWatermark("eventTime", "2 minutes")
.groupBy(window(col("eventTime"), "5 seconds"), col("teacherId"))
.agg(collect_list(col("groupname")));

实际结果显示输出处的数据框为空。没有任何行。

apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

问题是current_timestamp()

current_timestamp返回该时刻的时间戳,因此,如果使用此列创建一个数据帧并打印结果,则打印当前时间戳,但是如果处理df并打印同一列,则打印新的时间戳。

此解决方案在本地工作,但是有时在分布式系统中会失败,因为工作人员在收到打印数据的命令时,该数据已经超出了时间戳范围。

© www.soinside.com 2019 - 2024. All rights reserved.