Spark结构化流中的多个聚合和不同的函数

问题描述 投票:0回答:1

我需要对来自Kafka的流数据进行一些汇总,并每M秒将结果的前10行输出到控制台。

    input_df = (
       spark
       .readStream
       .format("kafka")
       .option("kafka.bootstrap.servers", brokers)
       .option("subscribe", "page_views")
       .load()
       .selectExpr('cast(value as string)')
    )

    ...
    ...

    # info has 2 cols: domain, uid  (info = transformation of input_df)
    # It's an example of what I want to do (like in simple pyspark)
    stat = (
        info
        .groupby('domain')
        .agg(
             F.count(F.col('UID')).alias('view'),
             F.countDistinct(F.col('UID')).alias('unique')
        )
        .sort(F.col("view").desc())
        .limit(10)
    )

    query = (
        stat
        .writeStream
        .outputMode("complete")
        .format("console")
        .option("truncate", "true")
        .start()
    )

此示例没有时间触发,但我可以自己完成。因为不允许使用countDistinct,所以我没有做运动的想法。我尝试为每个聚合设置2个df(df_1 =(域,视图),df_2 =(域,唯一)),然后将df_1与df_2结合在一起,但也不允许它具有多个聚合。所以对我来说这是死胡同。做出决定会很酷。

感谢您的关注!

pyspark apache-kafka apache-spark-sql pyspark-sql spark-structured-streaming
1个回答
0
投票

您可以通过flatMapGroupWithState实现此功能,它是任意状态函数。此外,它还支持追加模式和更新模式。

© www.soinside.com 2019 - 2024. All rights reserved.