我想在Airflow中创建一个条件任务,如下面的架构中所述。预期的情况如下:
以上所有任务都是SSHExecuteOperator。我猜我应该使用ShortCircuitOperator和/或XCom来管理这个条件,但我不知道如何实现它。你能描述一下解决方案吗?
所有运算符都有一个trigger_rule参数,该参数定义生成的任务被触发的规则。
触发规则的可能性:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
以下是解决问题的想法:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
气流具有BranchPythonOperator,可用于更直接地表达分支依赖性。
docs描述了它的用途:
BranchPythonOperator与PythonOperator非常相似,只是它需要一个返回task_id的python_callable。返回返回的task_id,并跳过所有其他路径。 Python函数返回的task_id必须直接引用BranchPythonOperator任务下游的任务。
...
如果你想跳过一些任务,请记住你不能有一个空路径,如果是这样,那就做一个虚拟任务。
def dummy_test():
return 'branch_a'
A_task = DummyOperator(task_id='branch_a', dag=dag)
B_task = DummyOperator(task_id='branch_false', dag=dag)
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=dummy_test,
dag=dag,
)
branch_task >> A_task
branch_task >> B_task
如果您正在安装Airflow版本> = 1.10.3,您还可以使用return a list of task ids,允许您在单个Operator和don't use a dummy task before joining中跳过多个下游路径。