[随机出现] [Spark ML ALS] [AWS EMR]检查点文件夹中的FileNotFoundException,但文件存在

问题描述 投票:0回答:1

我正在AWS EMR上运行一个计划的(每天一次)spark应用程序,该应用程序是基于spark.ml.recommendation.ALS的推荐算法,数据位于AWS S3上,该应用程序将建议输出到一组用户。

为了确保迭代算法能够稳定运行,我使用了spark的检查点函数。我在AWS S3上设置了检查点文件夹。

有时一切正常。但是有时,即使文件实际存在,Spark应用程序也无法在检查点文件夹中找到该文件。我不知道为什么。

这是典型的错误日志:19/10/30 13:46:01警告TaskSetManager:在873.0阶段丢失了任务5.0(TID 12169,ip-10-79-9-182.us-west-2.compute.internal,executor 5):java.io .FileNotFoundException:没有这样的文件或目录:s3a:// bucket-name / checkpoint / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005在org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)在org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)在org.apache.spark.rdd.ReliableCheckpointRDD $ .readCheckpointFile(ReliableCheckpointRDD.scala:292)在org.apache.spark.rdd.ReliableCheckpointRDD.compute(ReliableCheckpointRDD.scala:100)在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)

我检查了S3存储上是否确实存在s3a:// bucket-name / checkpoint / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005。

我的详细步骤如下:

  1. 在s3上创建检查点文件夹;
  2. 将spark的CheckpointDir设置为刚刚创建的文件夹;
  3. 运行算法;
  4. 删除检查点文件夹进行清理。

这是我的scala代码:

//step 1
val pathString = "s3a://bucket-name/checkpoint"
val path = new Path(pathString)
val fileSystem = FileSystem.get(path.toUri, sparkContext.hadoopConfiguration)
fileSystem.mkdirs(path)

//step 2
sparkContext.setCheckpointDir(pathString)

//step 3
//... lots of data that not so relevant
val als = new ALS()
      .setRank(10)
      .setMaxIter(20)
      .setUserCol("userId")
      .setItemCol("clubId")
      .setRatingCol("rating")
      .setCheckpointInterval(10)
      .setColdStartStrategy("drop")
      .setPredictionCol("prediction")
//... another lots of data that not so relevant

//step 4
fileSystem.delete(path, recursive = true)
scala apache-spark-ml checkpoint
1个回答
0
投票

S3最终是一致的-如果客户端在创建文件之前执行HEAD,有时404可以在负载均衡器中进行缓存-然后在后续的HEAD / GET请求中,404(a)返回(b)刷新缓存条目,因此它仍然存在

© www.soinside.com 2019 - 2024. All rights reserved.