在Spark中查询流数据集

问题描述 投票:1回答:2

我有一个streaming数据集,其列为:bag_id,ball_color。我想找到每个袋子最受欢迎的颜色。因此,我尝试了:

dataset.groupBy("bag_id", "color") # 1st aggregation
       .agg(count("color").as("color_count"))
       .groupBy("bag_id") # 2nd aggregation
       .agg(max("color_count"))

但是我有一个错误:

线程“主” org.apache.spark.sql.AnalysisException中的异常:流不支持多个流聚合DataFrames / Datasets ;;

我可以仅使用一个聚合函数来创建正确的查询吗?

apache-spark pyspark apache-spark-sql dataset spark-structured-streaming
2个回答
1
投票

有一个开放的Jira解决了此问题Spark-26655,到目前为止,我们无法在Streaming数据上运行多个聚合。

一种解决方法是执行one aggregation并保存回Kafka..etc,然后再次从kafka读取以执行另一种聚合。

(or)

我们只能对流数据运行一次聚合,并将其保存到HDFS / Hive / HBase并进行提取以执行其他聚合(这将是单独的工作)


1
投票

是,在Spark 2.4.4(目前最新)中不支持yest多个流聚合。但是,作为解决方法,您可以使用.foreachBatch() method

def foreach_batch_function(df, epoch_id):
  df.groupBy("bag_id","color")
  .agg(count("color").as("color_count"))
  .groupBy("bag_id").agg(max("color_count"))
  .show() # .show() is a dummy action

streamingDF.writeStream.foreachBatch(foreach_batch_function).start()  

.foreachBatch()中,df不是流式df,因此您可以做任何您想做的事。

© www.soinside.com 2019 - 2024. All rights reserved.