databticks 中的 Foreachbatch 在第一个微批次完成后引起问题

问题描述 投票:0回答:0

我正在使用 foreachbatch 将流数据写入多个目标,并且它在第一次微批处理执行时工作正常。当它尝试运行第二个微批次时,它失败并出现以下错误。 “StreamingQueryException:查询 [id = 0d8e45ff-4f3a-42c0-964d-6f41c93df801,runId = 186a22bf-c75e-482b-bd4b-19b039a9aa38] 异常终止: abfss://[email protected]/primary/directory1 已经存在”

下面是我使用的 foreach 片段。

df_new = <<<some streaming dataset>>>

val appId = "1dbcd4f2-eeb7-11ed-a05b-0242ac120003" 

df_new.writeStream.format("delta").option("mergeSchema", "true").outputMode("append").option("checkpointLocation", "abfss://[email protected]/checkpoint/chkdir").foreachBatch { (batchDF: DataFrame, batchId: Long) =>

  batchDF.persist()

  val fc_final= batchDF.filter(col("msg_type") === "FC" ).drop(columnlist_fc:_*)

  fc_final.write.option("txnVersion", batchId).option("txnAppId", appId).save("abfss://[email protected]/primary/directory1")

  val hb_final = batchDF.filter(col("msg_type") =!= "FC" ).drop(columnlist_hb:_*)

  hb_final.write.partitionBy("occurrence_month").option("txnVersion", batchId).option("txnAppId", appId).save("abfss://[email protected]/primary/directory2")
  
  batchDF.unpersist()
()

}.start().awaitTermination()

我在这里缺少什么?为什么它不能将数据文件追加到 delta 目录,即使我指定了 mode=append。非常感谢您的帮助。

scala databricks spark-streaming azure-databricks
© www.soinside.com 2019 - 2024. All rights reserved.