我正在Azure Databricks中尝试此代码:
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])
// readstream from azure event hub
df = spark.readStream.format("eventhubs").options(**ehConf).schema(jsonSchema).load()
streamingCountsDF = (df.withWatermark("Time", "500 milliseconds").groupBy(
df.body,
window(df.enqueuedTime, "1 hour"))
.count()
)
//writing stream to azure blob
streamingCountsDF.writeStream.format("parquet").option("path", file_location).option("checkpointLocation", "/tmp/checkpoint").start()
file_location is the azure blob url.
我在最后一步遇到错误:
org.apache.spark.sql.AnalysisException:当不带水印的流式数据帧/数据集上有流式聚合时,不支持追加输出模式;
我们如何解决这个问题?
根据我们使用的查询,我们需要选择适当的输出模式。选择一个错误会导致运行时异常,如下所示。
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on
streaming DataFrames/DataSets without watermark;
参考:您可以了解有关不同查询与不同输出模式here的兼容性的更多信息。
[在结构化流中,流处理的输出是数据帧或表。查询的输出模式表示此无限输出表如何写入接收器,在我们的示例中为控制台。
有三种输出模式:
Append-在这种模式下,只有最后一个触发器(批处理)到达的记录才会被写入接收器。这对于简单的转换(例如选择,过滤器等)是受支持的。由于这些转换不会更改为较早的批次计算的行,因此添加新行的效果很好。
Complete-在此模式下,每次将完整的结果表写入接收器。通常与聚合查询一起使用。如果是汇总,结果的输出将在新数据到达时以及不断变化的时候保持不变。
Update-在此模式下,仅将最后一次触发更改的记录写入接收器。