FileNotFoundException: Spark保存失败。无法从Dataset[T] avro中清除缓存。

问题描述 投票:0回答:1

当我第二次在avro中保存一个数据帧时,得到以下错误。如果我在保存后删除sub_folderpart-00000-XXX-c000.avro,然后再尝试保存相同的数据集,我得到以下结果。

FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  • 如果我不仅从 sub_folder但也来自 main_folder但我买不起,所以问题不会发生。
  • 实际上,当试图以任何其他格式保存数据集时,问题不会发生。
  • 保存一个空数据集不会导致错误。

这个例子表明,表需要被刷新,但作为输出的 sparkSession.catalog.listTables().show() 没有表要刷新。

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

之前保存的数据框是这样的。应用程序应该更新它。

+--------------------+--------------------+
|              Col1  |               Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+

对我来说,这是一个清除缓存的问题。然而,所有清除缓存的尝试都失败了。

 dataset.write
      .format("avro")
      .option("path", path)
      .mode(SaveMode.Overwrite) // Any save mode gives the same error
      .save()

// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()  

// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist() 

这就是我读取数据集的方式。

private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {

    val df = sparkSession.read
      .format("avro")
      .load(path)
      .select("*")

    df.as[T]
  }

最后,堆栈跟踪是这样的 非常感谢你的帮助!。

ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
    ... 10 more
apache-spark caching apache-spark-sql apache-spark-dataset spark-avro
1个回答
2
投票

*从同一位置读取和写入同一位置都会出现这个问题,这个问题在这个论坛上也有讨论。

而下面的错误信息将 误导......但实际的问题是在同一个位置读写。

You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL

我举了另一个例子,而不是你的例子(在你的情况下使用parquet avro)。

我有2个方案给你。

选项1 (cacheshow 工作原理如下...) :

import org.apache.spark.sql.functions._
  val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

  df.show(false)

  df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
  val df1 = spark.read.format("parquet").load(".../temp") // read back again

 val df2 = df1.withColumn("cleanup" , lit("Rod want to cleanup")) // like you said you want to clean it.

  //BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

  df2.cache // cache to avoid FileNotFoundException
  df2.show(2, false) // light action to avoid FileNotFoundException
   // or println(df2.count) // action

   df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
  println("Rod saved in same directory where he read it from final records he saved after clean up are  ")
  df2.show(false)

选项2 :

1) 用不同的avro文件夹保存DataFrame。

2)删除旧的avro文件夹。

3) 最后将这个新创建的 avro 文件夹的旧名称,将工作。


0
投票

非常感谢Ram Ghadiyaram!

该解决方案已经2解决了我的问题,但只有在我的本地Ubuntu。当我在HDFS中测试时,问题仍然存在。

解决方案1是一个明确的解决方法。这是我的代码现在的样子。

private def doWriteToPath[T <: Product with Serializable: TypeTag: ClassTag](dataset: Dataset[T], path: String): Unit = {

// clear any previously cached avro
sparkSession.catalog.clearCache()

// update the cache for this particular dataset, and trigger an action
dataset.cache().show(1)

dataset.write
  .format("avro")
  .option("path", path)
  .mode(SaveMode.Overwrite)
  .save()    
}

有些话要说: 我确实检查过那个帖子,并尝试了不成功的解决方案。我认为那是我的问题,原因如下。

  • 我在 "main_folder "下创建了一个名为 "sub_folder_temp "的临时文件,但保存仍然失败。
  • 在相同的路径中保存相同的非空数据集,但以json格式保存,实际上不需要这里讨论的变通方法也能工作。
  • 在同一路径中保存具有相同类型[T]的空数据集,如果没有这里讨论的变通方法,实际上是可行的。
© www.soinside.com 2019 - 2024. All rights reserved.