工作65被取消,因为SparkContext被关闭。

问题描述 投票:0回答:1

我在一个共享的Apache Zeppelin服务器上工作。几乎每天,我都会尝试运行一个命令,并得到这个错误。Job 65 cancelled because SparkContext was shut down

我很想知道是什么原因导致SparkContext停止运行的 我的理解是Zeppelin是一个kube应用程序,它将命令发送到机器上进行处理。

当SparkContext关闭时,是否意味着我与Spark集群的桥接已经关闭?而且,如果是这样的话,我怎么才能导致通往spark集群的桥断掉呢?

在这个例子中,它发生在我试图将数据上传到S3时。

以下是代码

val myfiles = readParquet(
    startDate=ew LocalDate(2020, 4, 1),
    endDate=ew LocalDate(2020, 4, 7)
)

log_events.createOrReplaceTempView("log_events")

val mySQLDF = spark.sql(s"""
    select [6 columns]
    from myfiles 
    join [other table]
    on [join_condition]
"""
)

mySQLDF.write.option("maxRecordsPerFile", 1000000).parquet(path)
// mySQLDF has 3M rows and they're all strings or dates

这是堆栈跟踪错误

org.apache.spark.SparkException: Job aborted.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
  at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
  ... 48 elided
Caused by: org.apache.spark.SparkException: Job 44 cancelled because SparkContext was shut down
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:972)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:970)
  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
  at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:970)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2286)
  at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
  at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2193)
  at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
  at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
  at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:121)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
  ... 70 more
apache-spark hadoop pyspark apache-spark-sql apache-zeppelin
1个回答
3
投票

你的作业在写步骤中被中止了。Job aborted. 是异常信息,这就是导致Spark Context被关闭的原因。

看看优化写步骤。maxRecordsPerFile 可能是罪魁祸首;也许可以试试更低的数字......你目前有1M的记录在一个文件中!一般来说,你的文件中的记录都是由你的朋友提供的。


一般来说, Job ${job.jobId} cancelled because SparkContext was shut down 只是意味着这是一个异常,由于这个异常,DAG无法继续,需要出错。它的 Spark调度器抛出这个错误 当它面临一个异常时,可能是你的代码中没有处理的异常,或者是由于其他原因导致的作业失败,而当DAG调度器被停止时,整个应用程序将被停止。当DAG调度器被停止时,整个应用程序将被停止(此消息是清理的一部分)。


对于你的问题 -

当SparkContext关闭时,是否意味着我与Spark集群的桥接已经关闭?

SparkContext代表了与Spark集群的连接,所以如果它死了,意味着你不能在它上运行作业,因为你失去了链接!在Zepplin上,你可以重启SparkContext(Menu -> Interpreter -> Spark Interpreter)。在Zepplin上,你可以直接重启SparkContext(Menu -> Interpreter -> Spark Interpreter -> restart)

如果是这样的话,我怎样才能使通往火花集群的桥断掉呢?

通过Jobs中的SparkExceptionError或者手动使用 sc.stop()

© www.soinside.com 2019 - 2024. All rights reserved.