使用 RocksDBStateStoreProvider 时 Spark 结构化流 StateStore 异常

问题描述 投票:0回答:1

我正在开发一个具有“流-流连接”逻辑的 Spark 结构化流应用程序,利用 RocksDB 作为存储状态的骨干。该设置包括 Kafka 作为我的流的源,我在 local 模式下使用 Java 17 和 Spark 3.5.0。 我在 Spark 结构化流应用程序中遇到涉及流-流连接的问题。我想使用 RocksDB 作为我的 Spark 应用程序状态存储。我已使用 spark.sql.streaming.stateStore.providerClass 将状态存储提供程序配置为 org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider。运行多个批次后,我遇到以下异常。我多次清理检查点目录并重新运行应用程序,每次都会发生异常。我的源流中的连接 ID 之间存在“多对一”关系,并且我在源流中找不到任何数据损坏。我使用 Kafka 作为流源,在本地运行 Spark,并利用 Windows 目录作为检查点目录。 我也尝试了spark 3.4.0版本,但仍然出现异常。 我感谢任何有关解决此问题的见解或指导。谢谢! org.rocksdb.RocksDBException: Mismatch in unique ID on table file 27. Expected: {7568779327299478048,6781697239205038417} Actual: {7568779327299478056,2042577083335893353} in file C:\Windows\Temp\spark-fed38dd1-aae2-45ab-ad2f-8e0f02ea223b\StateStoreId(opId=0,partId=1,name=left-keyWithIndexToValue)-a692f8b0-e03c-4475-8925-0d589f67d628\workingDir-c71dca44-dcbc-4057-91d2-afce3d7cb7b4/MANIFEST-000005 at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-8.3.2.jar!/:?] at org.rocksdb.RocksDB.open(RocksDB.java:249) ~[rocksdbjni-8.3.2.jar!/:?] at org.apache.spark.sql.execution.streaming.state.RocksDB.openDB(RocksDB.scala:584) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:154) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.getStore(RocksDBStateStoreProvider.scala:194) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:507) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$StateStoreHandler.getStateStore(SymmetricHashJoinStateManager.scala:417) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.<init>(SymmetricHashJoinStateManager.scala:600) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.<init>(SymmetricHashJoinStateManager.scala:386) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.<init>(StreamingSymmetricHashJoinExec.scala:529) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.processPartitions(StreamingSymmetricHashJoinExec.scala:276) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1(StreamingSymmetricHashJoinExec.scala:241) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1$adapted(StreamingSymmetricHashJoinExec.scala:241) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper$StateStoreAwareZipPartitionsRDD.compute(StreamingSymmetricHashJoinHelper.scala:295) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) ~[spark-common-utils_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) ~[spark-common-utils_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) ~[spark-core_2.12-3.5.0.jar!/:3.5.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) [spark-core_2.12-3.5.0.jar!/:3.5.0] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]```

我的用例非常相似,我使用 Kafka 作为我的流的来源,并且我正在执行大量有状态操作。
java apache-spark apache-spark-sql spark-structured-streaming rocksdb
1个回答
0
投票

我尝试了一些方法,但没有任何效果。我最终将 Spark 降级到版本 3.3.2,一切都恢复正常了。

如果你真的想坚持使用 3.5.0,我认为将“spark.shuffle.service.db.backend”设置为 LEVELDB,并删除 RocksDBStateStoreProvider 应该可以解决你的问题。

© www.soinside.com 2019 - 2024. All rights reserved.