我有一个streaming数据集,其列为:bag_id,ball_color。我想找到每个袋子最受欢迎的颜色。因此,我尝试了:
dataset.groupBy("bag_id", "color") # 1st aggregation
.agg(count("color").as("color_count"))
.groupBy("bag_id") # 2nd aggregation
.agg(max("color_count"))
但是我有一个错误:
线程“主” org.apache.spark.sql.AnalysisException中的异常:流不支持多个流聚合DataFrames / Datasets ;;
我可以仅使用一个聚合函数来创建正确的查询吗?
有一个开放的Jira解决了此问题Spark-26655,到目前为止,我们无法在Streaming数据上运行多个聚合。
一种解决方法是执行one aggregation
并保存回Kafka..etc,然后再次从kafka读取以执行另一种聚合。
(or)
我们只能对流数据运行一次聚合,并将其保存到HDFS / Hive / HBase并进行提取以执行其他聚合(这将是单独的工作)
是,在Spark 2.4.4(目前最新)中不支持yest多个流聚合。但是,作为解决方法,您可以使用.foreachBatch()
method:
def foreach_batch_function(df, epoch_id):
df.groupBy("bag_id","color")
.agg(count("color").as("color_count"))
.groupBy("bag_id").agg(max("color_count"))
.show() # .show() is a dummy action
streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
在.foreachBatch()
中,df不是流式df,因此您可以做任何您想做的事。