一段时间后停止Spark会话-Pyspark

问题描述 投票:0回答:1

我正在执行ETL有时会花费很多时间。我想在一段时间后正常关闭Spark会话。

我正在Pyspark中编写代码。

try:
 df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
 spark.stop()

我想在上述代码中的某些时间后停止火花。

是否有办法在一段时间后正常关闭Spark会话?

python apache-spark pyspark timer exit
1个回答
0
投票

我建议使用官方的python Timer正常停止Spark会话:

import threading

def timer_elapsed():
    print('Timer elapsed')
    if not sc._jsc.sc().isStopped():
      spark.stop()

# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()

try:
  df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
  print('Spark job finished successfully.')
except Exception as e:
  spark_timer.cancel() # stop timer, we don't need to wait if error occured
  if not sc._jsc.sc().isStopped():
    spark.stop()

注:如果时间已过或捕获到异常,我们会在两种情况下停止会话。在请求停止Spark上下文之前,我们检查该上下文是否正在使用直接调用Java API的sc._jsc.sc().isStopped运行。

© www.soinside.com 2019 - 2024. All rights reserved.