我的 Spark 工作面临这个问题。这项工作长时间运行良好,但现在我看到了这个问题。我无法找到解决方案,请帮助我。
我们正在 Kubernetes 上运行它。
Caused by: java.lang.IllegalStateException: Error reading streaming state file of s3a://table-onlineofflineinternet-store/checkpointing/abcd/temp_table/state/0/6/563042.changelog does not exist.
If the stream job is restarted with a new or updated state operation, please create a new checkpoint location or clear the existing checkpoint location.
at org.apache.spark.sql.execution.streaming.state.StateStoreChangelogReader.liftedTree1$1(StateStoreChangelog.scala:136)
at org.apache.spark.sql.execution.streaming.state.StateStoreChangelogReader.<init>(StateStoreChangelog.scala:129)
at org.apache.spark.sql.execution.streaming.state.RocksDBFileManager.getChangelogReader(RocksDBFileManager.scala:159)
at org.apache.spark.sql.execution.streaming.state.RocksDB.$anonfun$replayChangelog$1(RocksDB.scala:196)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
at scala.collection.immutable.NumericRange.foreach(NumericRange.scala:75)
at org.apache.spark.sql.execution.streaming.state.RocksDB.replayChangelog(RocksDB.scala:193)
at org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:166)
at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.getReadStore(RocksDBStateStoreProvider.scala:200)
at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.getReadStore(RocksDBStateStoreProvider.scala:30)
at org.apache.spark.sql.execution.streaming.state.StateStore$.getReadOnly(StateStore.scala:492)
at org.apache.spark.sql.execution.streaming.state.ReadStateStoreRDD.compute(StateStoreRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
我希望 Spark 社区看看如何解决这个问题,我的 Spark 代码是用 scala 编写的,仅供参考。
s3 不是真正的文件系统;写入其中的文件仅在 close() 期间变得可见,因此如果进程终止,则写入的任何更改日志都不可见。您确信您使用的 Spark Streaming 检查点与 S3 兼容吗?其中自信意味着“是否有明确记录它有效?
spark.hadoop.fs.s3a.downgrade.syncable.exceptions
设置为 false
,然后检查点使用文件系统 hflush/hsync api 来保存数据的任何尝试都会快速失败。这并不能解决您的问题,但它会确认正在使用不兼容的检查指针。