我正在AWS EMR上运行一个计划的(每天一次)spark应用程序,该应用程序是基于spark.ml.recommendation.ALS的推荐算法,数据位于AWS S3上,该应用程序将建议输出到一组用户。
为了确保迭代算法能够稳定运行,我使用了spark的检查点函数。我在AWS S3上设置了检查点文件夹。
有时一切正常。但是有时,即使文件实际存在,Spark应用程序也无法在检查点文件夹中找到该文件。我不知道为什么。
这是典型的错误日志:19/10/30 13:46:01警告TaskSetManager:在873.0阶段丢失了任务5.0(TID 12169,ip-10-79-9-182.us-west-2.compute.internal,executor 5):java.io .FileNotFoundException:没有这样的文件或目录:s3a:// bucket-name / checkpoint / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005在org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1642)在org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:521)在org.apache.spark.rdd.ReliableCheckpointRDD $ .readCheckpointFile(ReliableCheckpointRDD.scala:292)在org.apache.spark.rdd.ReliableCheckpointRDD.compute(ReliableCheckpointRDD.scala:100)在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
我检查了S3存储上是否确实存在s3a:// bucket-name / checkpoint / 8f63442c-dd06-45d8-8e3a-ec30634b1a2f / rdd-2166 / part-00005。
我的详细步骤如下:
这是我的scala代码:
//step 1
val pathString = "s3a://bucket-name/checkpoint"
val path = new Path(pathString)
val fileSystem = FileSystem.get(path.toUri, sparkContext.hadoopConfiguration)
fileSystem.mkdirs(path)
//step 2
sparkContext.setCheckpointDir(pathString)
//step 3
//... lots of data that not so relevant
val als = new ALS()
.setRank(10)
.setMaxIter(20)
.setUserCol("userId")
.setItemCol("clubId")
.setRatingCol("rating")
.setCheckpointInterval(10)
.setColdStartStrategy("drop")
.setPredictionCol("prediction")
//... another lots of data that not so relevant
//step 4
fileSystem.delete(path, recursive = true)
S3最终是一致的-如果客户端在创建文件之前执行HEAD,有时404可以在负载均衡器中进行缓存-然后在后续的HEAD / GET请求中,404(a)返回(b)刷新缓存条目,因此它仍然存在