如何在Spark中不读取输出表的情况下查看DataFrameWriter save()的最终写入结果?

问题描述 投票:0回答:1

现在我使用 Scala + Spark 将 DataFrame: df 写入 KustoCluster,我刚刚发现 DataFrameWriter.save() 没有回调选项,这是我的代码

    var writer = df.write.format("com.microsoft.kusto.spark.synapse.datasource")
      .option("spark.synapse.linkedService", linkedServiceName)
      .option("kustoDatabase", database)
      .option("kustoTable", table)

    writer.mode(mode).save()

有时数据帧 df 可能很大,需要一些时间才能运行,并且可能会在一段时间后失败,但是我认为没有办法检查 DataFrameWriter 的最终 save() 结果,所以如果最终失败了,在 Spark 作业中没有办法判断。 有人建议我可以尝试读取输出表来检查 save() 过程是否最终成功。但这将是困难且耗时的。那么有什么方法或功能可以用来检查写入结果吗?

找到一种方法来检查数据帧写入结果而不读取输出表。

dataframe scala apache-spark apache-spark-sql
1个回答
0
投票

当您触发写入时,Spark 会创建一个操作,作业开始具体化为阶段和任务,作业的进度/状态最终将记录在 Spark UI 中。您可以创建实用程序来在作业运行时保持 Spark UI 视图处于活动状态/直到用户停止集群。

但是假设您无权访问 Spark UI,那么一种方法是将

save
逻辑包装在一个调用
writer
对象的方法中,该对象实现了 try-catch 块,如下所示:

def safeSave(df: DataFrame, writer: DataFrameWriter[Row], mode: SaveMode): Boolean = {
  try {
    writer.mode(mode).save()
    true
  } catch {
    case e: Exception =>
      println(s"Failed to save DataFrame: ${e.getMessage}")
      false
  }
}

然后调用返回布尔值的方法

True
如果保存成功。

val isSuccess = safeSave(df, writer, mode)

正如评论中提到的,如果存在将数据写入 Hadoop 之类的文件系统(HDFS、S3 等)的用例,那么您可以检查零字节

_SUCCESS
文件是否存在,该文件标记作为成功写入的指标。

以下是要点:

// Create a Hadoop Configuration and FileSystem
val conf = new Configuration()
val fs = FileSystem.get(conf)

// Define the path to the _SUCCESS file (write location)
val successFilePath = new Path(s"$pathToOutputDirectory/_SUCCESS")

// Check if the _SUCCESS file exists
if (fs.exists(successFilePath)) {
  println("The save operation completed successfully.")
} else {
  println("The save operation did not complete successfully.")
}

此外,值得注意的是,即使在基于 Hadoop 的文件系统中,也不能保证在所有场景下都会创建

_SUCCESS
文件。例如,如果写入操作由于某种原因被中断或失败,则可能无法创建
_SUCCESS
文件,因此您可以使用 try-catch 块,而不是手动检查它。

此外,某些配置或自定义输出格式(如 NoSQL 数据库)可能会选择不创建

_SUCCESS
文件。

© www.soinside.com 2019 - 2024. All rights reserved.