我有一个分支任务,该任务依赖于XCOM直接在上游设置的XCOM。上游任务ID是通过诸如task_1,task_2..task_n之类的循环生成的。所以像这样:
task_n >> branch[task_a, task_b]
分支是否可以通过其直接上游访问XCOM集的方法?我知道我可以使用op_kwargs并将任务ID传递给分支。我只是想看看是否还有更多的Airflow本机方式。
PythonBranchOperator
应该用provide_context=True
创建,并且可调用的python看起来像这样:
def branch_callable(task_instance, task, **kwargs):
upstream_ids = task.upstream_task_ids # an iterable
xcoms = task_instance.xcom_pull(task_ids=upstream_ids)
# process the xcoms of the direct upstream tasks