如果 Apache Airflow 中的前一个任务失败,如何运行任务

问题描述 投票:0回答:1

我需要执行以下操作:

检查服务器是否已启动,如果是,我检查该服务器中的 Spark 集群是否已启动,如果它已关闭,我尝试启动它,如果它已经启动,我将继续运行我的 Spark 作业。

我想创建一个任务来检查 Spark 集群是否已启动(也许尝试运行一个简单的 Spark 作业)。如果失败,我将启动“启动 Spark 集群”任务。

我正在使用 Airflow,但没有找到一种方法来触发任务,以防前一个任务失败。除此之外,我需要检查前一个任务,以防它成功,这样它将分支到 Spark 作业任务并跳过“启动 Spark 集群”任务。

如果您能提供一些样品那就太好了。我尝试使用trigger_rule和分支运算符,但到目前为止什么也没得到。也许是因为网上关于它们的代码示例太少了。

谢谢

python triggers controls airflow
1个回答
0
投票

您可以使用PythonBranhOperator。 这是一个代码示例供您参考。

VALID_TEST_OUTPUT = 'valid-output'
INVALID_TEST_OUTPUT = 'invalid-test-output'


test_job=PythonOperator(
    ## your task
)

def select_branch_after_test(**kwargs):
    '''
    :return: returns the task_id to proceed with.
    '''
    if com.getoutput('hadoop fs -ls /user/praveen/.test_out/ | grep file1'):
        follow_previous_job = VALID_TEST_OUTPUT
    else:
        follow_previous_job = INVALID_TEST_OUTPUT
    return follow_previous_job


# branching based on successful test output
branching = BranchPythonOperator(task_id=BRANCH_TEST_OUTPUT,
                                 python_callable=select_branch_after_test,
                                 dag=dag)
branching.set_upstream(test_job)

valid_test_output = PythonOperator(
    task_id=VALID_TEST_OUTPUT,
    python_callable=lambda **kwargs: logging.info("test genereated valid output, proceeding with other steps"),
    provide_context=True,
    dag=dag)
valid_test_output.set_upstream(branching)

end_task = PythonOperator(
    task_id=END,
    python_callable=pipeline_run_time,
    provide_context=True,
    trigger_rule='one_success',  ## Important
    dag=dag)

invalid_test_output = PythonOperator(
    task_id=INVALID_TEST_OUTPUT,
    python_callable=lambda **kwargs: logging.info("test did not create valid output, completeing branching dag to other path"),
    provide_context=True,
    dag=dag)
invalid_test_output.set_upstream(branching)

valid_test_output.set_downstream(end_task)
invalid_test_output.set_downstream(end_task)

`

© www.soinside.com 2019 - 2024. All rights reserved.