现在我使用 Scala + Spark 将 DataFrame: df 写入 KustoCluster,我刚刚发现 DataFrameWriter.save() 没有回调选项,这是我的代码
var writer = df.write.format("com.microsoft.kusto.spark.synapse.datasource")
.option("spark.synapse.linkedService", linkedServiceName)
.option("kustoDatabase", database)
.option("kustoTable", table)
writer.mode(mode).save()
有时数据帧 df 可能很大,需要一些时间才能运行,并且可能会在一段时间后失败,但是我认为没有办法检查 DataFrameWriter 的最终 save() 结果,所以如果最终失败了,在 Spark 作业中没有办法判断。 有人建议我可以尝试读取输出表来检查 save() 过程是否最终成功。但这将是困难且耗时的。那么有什么方法或功能可以用来检查写入结果吗?
找到一种方法来检查数据帧写入结果而不读取输出表。
当您触发写入时,Spark 会创建一个操作,作业开始具体化为阶段和任务,作业的进度/状态最终将记录在 Spark UI 中。您可以创建实用程序来在作业运行时保持 Spark UI 视图处于活动状态/直到用户停止集群。
但是假设您无权访问 Spark UI,那么一种方法是将
save
逻辑包装在一个调用 writer
对象的方法中,该对象实现了 try-catch 块,如下所示:
def safeSave(df: DataFrame, writer: DataFrameWriter[Row], mode: SaveMode): Boolean = {
try {
writer.mode(mode).save()
true
} catch {
case e: Exception =>
println(s"Failed to save DataFrame: ${e.getMessage}")
false
}
}
然后调用返回布尔值的方法
True
如果保存成功。
val isSuccess = safeSave(df, writer, mode)
正如评论中提到的,如果存在将数据写入 Hadoop 之类的文件系统(HDFS、S3 等)的用例,那么您可以检查零字节
_SUCCESS
文件是否存在,该文件标记作为成功写入的指标。
以下是要点:
// Create a Hadoop Configuration and FileSystem
val conf = new Configuration()
val fs = FileSystem.get(conf)
// Define the path to the _SUCCESS file (write location)
val successFilePath = new Path(s"$pathToOutputDirectory/_SUCCESS")
// Check if the _SUCCESS file exists
if (fs.exists(successFilePath)) {
println("The save operation completed successfully.")
} else {
println("The save operation did not complete successfully.")
}
此外,值得注意的是,即使在基于 Hadoop 的文件系统中,也不能保证在所有场景下都会创建
_SUCCESS
文件。例如,如果写入操作由于某种原因被中断或失败,则可能无法创建 _SUCCESS
文件,因此您可以使用 try-catch 块,而不是手动检查它。
此外,某些配置或自定义输出格式(如 NoSQL 数据库)可能会选择不创建
_SUCCESS
文件。