我需要执行以下操作:
检查服务器是否已启动,如果是,我检查该服务器中的 Spark 集群是否已启动,如果它已关闭,我尝试启动它,如果它已经启动,我将继续运行我的 Spark 作业。
我想创建一个任务来检查 Spark 集群是否已启动(也许尝试运行一个简单的 Spark 作业)。如果失败,我将启动“启动 Spark 集群”任务。
我正在使用 Airflow,但没有找到一种方法来触发任务,以防前一个任务失败。除此之外,我需要检查前一个任务,以防它成功,这样它将分支到 Spark 作业任务并跳过“启动 Spark 集群”任务。
如果您能提供一些样品那就太好了。我尝试使用trigger_rule和分支运算符,但到目前为止什么也没得到。也许是因为网上关于它们的代码示例太少了。
谢谢
您可以使用PythonBranhOperator。 这是一个代码示例供您参考。
VALID_TEST_OUTPUT = 'valid-output'
INVALID_TEST_OUTPUT = 'invalid-test-output'
test_job=PythonOperator(
## your task
)
def select_branch_after_test(**kwargs):
'''
:return: returns the task_id to proceed with.
'''
if com.getoutput('hadoop fs -ls /user/praveen/.test_out/ | grep file1'):
follow_previous_job = VALID_TEST_OUTPUT
else:
follow_previous_job = INVALID_TEST_OUTPUT
return follow_previous_job
# branching based on successful test output
branching = BranchPythonOperator(task_id=BRANCH_TEST_OUTPUT,
python_callable=select_branch_after_test,
dag=dag)
branching.set_upstream(test_job)
valid_test_output = PythonOperator(
task_id=VALID_TEST_OUTPUT,
python_callable=lambda **kwargs: logging.info("test genereated valid output, proceeding with other steps"),
provide_context=True,
dag=dag)
valid_test_output.set_upstream(branching)
end_task = PythonOperator(
task_id=END,
python_callable=pipeline_run_time,
provide_context=True,
trigger_rule='one_success', ## Important
dag=dag)
invalid_test_output = PythonOperator(
task_id=INVALID_TEST_OUTPUT,
python_callable=lambda **kwargs: logging.info("test did not create valid output, completeing branching dag to other path"),
provide_context=True,
dag=dag)
invalid_test_output.set_upstream(branching)
valid_test_output.set_downstream(end_task)
invalid_test_output.set_downstream(end_task)
`