我正在执行ETL有时会花费很多时间。我想在一段时间后正常关闭Spark会话。
我正在Pyspark中编写代码。
try:
df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
spark.stop()
我想在上述代码中的某些时间后停止火花。
是否有办法在一段时间后正常关闭Spark会话?
我建议使用官方的python Timer正常停止Spark会话:
import threading
def timer_elapsed():
print('Timer elapsed')
if not sc._jsc.sc().isStopped():
spark.stop()
# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()
try:
df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
print('Spark job finished successfully.')
except Exception as e:
spark_timer.cancel() # stop timer, we don't need to wait if error occured
if not sc._jsc.sc().isStopped():
spark.stop()
注:如果时间已过或捕获到异常,我们会在两种情况下停止会话。在请求停止Spark上下文之前,我们检查该上下文是否正在使用直接调用Java API的sc._jsc.sc().isStopped
运行。