我正在尝试使用 pyspark.pandas 编写增量表,但面临以下错误。 无法弄清楚这里出了什么问题,因为具有更大行数的数据帧正在成功处理。每次崩溃都是这种情况。
我正在使用具有以下规格的azure databricks集群:
11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12)
4-7 Workers
256-448 GB Memory
128-224 Cores
1 Driver
32 GB Memory, 16 Cores
Runtime
11.3.x-scala2.12
combined_df_final.to_delta(file_path,mode ='append')
An error occurred while calling o15684.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:882)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$1(FileFormatWriter.scala:334)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:154)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDeltaCommand.run(WriteIntoDeltaCommand.scala:70)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles$11(TransactionalWriteEdge.scala:571)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:250)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:400)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:195)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:149)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:350)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles$1(TransactionalWriteEdge.scala:571)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag(DeltaLogging.scala:193)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag$(DeltaLogging.scala:180)
at com.databricks.sql.transaction.tahoe.OptimisticTransaction.withOperationTypeTag(OptimisticTransaction.scala:112)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$2(DeltaLogging.scala:157)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:262)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:260)
at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:112)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:156)
at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:547)
at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:642)
at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:663)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:404)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:147)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:402)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:399)
at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:26)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:447)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:432)
at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:26)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:637)
at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:556)
at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:26)
at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:547)
at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:517)
at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:26)
at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:66)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:148)
at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:72)
at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:59)
at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:107)
at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:433)
at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:412)
at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:112)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:155)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:145)
at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:133)
at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:112)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$recordWriteFilesOperation$1(TransactionalWriteEdge.scala:307)
at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:1837)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.recordWriteFilesOperation(TransactionalWriteEdge.scala:306)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles(TransactionalWriteEdge.scala:339)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles$(TransactionalWriteEdge.scala:333)
at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:112)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles(TransactionalWriteEdge.scala:621)
at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles$(TransactionalWriteEdge.scala:611)
at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:112)
at com.databricks.sql.transaction.tahoe.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:226)
at com.databricks.sql.transaction.tahoe.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:223)
at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:112)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.write(WriteIntoDelta.scala:391)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.$anonfun$run$2(WriteIntoDelta.scala:111)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.$anonfun$run$2$adapted(WriteIntoDelta.scala:100)
at com.databricks.sql.transaction.tahoe.DeltaLog.withNewTransaction(DeltaLog.scala:312)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:100)
at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:1837)
at com.databricks.sql.transaction.tahoe.commands.WriteIntoDelta.run(WriteIntoDelta.scala:99)
at com.databricks.sql.transaction.tahoe.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:168)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:49)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:247)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:250)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:400)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:195)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:149)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:350)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:247)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:232)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:245)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:238)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:99)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:298)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:294)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:238)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:354)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:238)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:192)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:183)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:274)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:965)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:430)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:339)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:251)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:306)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 596.0 failed 4 times, most recent failure: Lost task 0.3 in stage 596.0 (TID 4792) (172.16.177.136 executor 9): org.apache.spark.SparkException: Checkpoint block rdd_1733_0 not found! Either the executor that originally checkpointed this partition is no longer alive, or the original RDD is unpersisted. If this problem persists, you may consider using `rdd.checkpoint()` instead, which is slower than local checkpointing but more fault-tolerant.
使用正确的 Spark 版本解决了问题。是版本错误。