我需要对来自Kafka的流数据进行一些汇总,并每M秒将结果的前10行输出到控制台。
input_df = (
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", "page_views")
.load()
.selectExpr('cast(value as string)')
)
...
...
# info has 2 cols: domain, uid (info = transformation of input_df)
# It's an example of what I want to do (like in simple pyspark)
stat = (
info
.groupby('domain')
.agg(
F.count(F.col('UID')).alias('view'),
F.countDistinct(F.col('UID')).alias('unique')
)
.sort(F.col("view").desc())
.limit(10)
)
query = (
stat
.writeStream
.outputMode("complete")
.format("console")
.option("truncate", "true")
.start()
)
此示例没有时间触发,但我可以自己完成。因为不允许使用countDistinct,所以我没有做运动的想法。我尝试为每个聚合设置2个df(df_1 =(域,视图),df_2 =(域,唯一)),然后将df_1与df_2结合在一起,但也不允许它具有多个聚合。所以对我来说这是死胡同。做出决定会很酷。
感谢您的关注!
您可以通过flatMapGroupWithState实现此功能,它是任意状态函数。此外,它还支持追加模式和更新模式。