带有分组数据的 Spark 结构化流 - 每组一个微批次

问题描述 投票:0回答:1

如果对流数据帧数据进行分组,在 Spark 结构化流中是否可以在单独的单个微批次中处理每个组?像这样的东西:

dfs = ...
dfs.groupBy(...).writestream...foreachbatch()

谢谢!

apache-spark pyspark azure-databricks spark-structured-streaming
1个回答
0
投票

您无法在微批次中单独运行分组数据。如果您希望根据分组列将数据放在单独的文件夹中,可以使用下面的代码。

def batch_oper(df, epoch_id):
  df.groupby(F.col("country")).count() \
    .write.format("csv").mode("append") \
    .partitionBy("country") \
    .save("base_path_of_the_location")

df.writeStream \
  .outputMode("append") \
  .foreachBatch(batch_oper) \
  .option("checkpointLocation", "base_path_of_the_location/checkpoint") \
  .start()

def batch_oper(df, epoch_id):
    df.write.format("csv").partitionBy("country").save("dbfs:/part_csvs1/")

streaming_df.groupBy(F.col("country")).count().writeStream.outputMode("complete").foreachBatch(batch_oper).option("checkpointLocation", "dbfs:/part_csvs/checkpoint").start()

输出:

enter image description here

在这里,它为每个国家/地区创建单独的文件夹并存储数据。

如果您想对分组数据执行自定义操作,PySpark 不支持用户定义的聚合函数,但您可以按照this解决方案中的方法进行操作。

© www.soinside.com 2019 - 2024. All rights reserved.