使用Azure Databricks中的火花流将数据加载到天蓝色的blob中

问题描述 投票:0回答:1

我正在Azure Databricks中尝试此代码:

jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

// readstream from azure event hub
df = spark.readStream.format("eventhubs").options(**ehConf).schema(jsonSchema).load() 
 streamingCountsDF = (df.withWatermark("Time", "500 milliseconds").groupBy(
      df.body,
      window(df.enqueuedTime, "1 hour"))
    .count()
)

//writing stream to azure blob
 streamingCountsDF.writeStream.format("parquet").option("path", file_location).option("checkpointLocation", "/tmp/checkpoint").start() 

file_location is the azure blob url.

我在最后一步遇到错误:

org.apache.spark.sql.AnalysisException:当不带水印的流式数据帧/数据集上有流式聚合时,不支持追加输出模式;

我们如何解决这个问题?

pyspark spark-streaming azure-storage-blobs databricks azure-databricks
1个回答
0
投票

根据我们使用的查询,我们需要选择适当的输出模式。选择一个错误会导致运行时异常,如下所示。

org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on 
streaming DataFrames/DataSets without watermark;

参考:您可以了解有关不同查询与不同输出模式here的兼容性的更多信息。

[在结构化流中,流处理的输出是数据帧或表。查询的输出模式表示此无限输出表如何写入接收器,在我们的示例中为控制台。

有三种输出模式:

Append-在这种模式下,只有最后一个触发器(批处理)到达的记录才会被写入接收器。这对于简单的转换(例如选择,过滤器等)是受支持的。由于这些转换不会更改为较早的批次计算的行,因此添加新行的效果很好。

Complete-在此模式下,每次将完整的结果表写入接收器。通常与聚合查询一起使用。如果是汇总,结果的输出将在新数据到达时以及不断变化的时候保持不变。

Update-在此模式下,仅将最后一次触发更改的记录写入接收器。

© www.soinside.com 2019 - 2024. All rights reserved.