在 Apache Airflow 中,如何使用 Airflow TaskFlow API 在不同 dag 中的任务之间交换数据?

问题描述 投票:0回答:1

我有 2 个 dag,如下所示(请注意,第二个 dag 在第一个 dag 之后自动运行 - 它们由气流数据集链接)。有没有办法让第二个 dag 检索第一个 dag 的 push_data 任务返回的值?

(现在我收到的是 '{{ task_instance.xcom_pull(task_ids='push_data', dag_id='xcom_test_start', key='return_value') }}')

uiflow_xcom_test_dataset = Dataset("uiflow_xcom_test")

@dag(
    dag_id="xcom_test_start",
    default_args=default_dag_arguments,
    schedule_interval="@once",
    start_date=pendulum.datetime(2024, 1, 1, tz="US/Eastern"),
)
def xcom_test_start():

    @task(outlets=[ uiflow_xcom_test_dataset ])
    def push_data():

        return "sample data"

    push_data()

first_dag = xcom_test_start()


@dag(
    dag_id="xcom_test_end",
    default_args=default_dag_arguments,
    schedule=[ uiflow_xcom_test_dataset ],
    tags=["utility"]
)
def xcom_test_end():

    @task
    def pull_data():

        data = first_dag.tasks[0].output

        print(f"data pulled is {data}")

    pull_data()

xcom_test_end()
airflow airflow-taskflow
1个回答
0
投票

您可以使用 xcom_pull 而不是“first_dag.tasks[0].output”,例如

    @task()
    def pull_data(**kwargs):
        # Pull data from XCom
        ti = kwargs['ti']
        data = ti.xcom_pull(task_ids='push_data', dag_id='xcom_test_start')
        print(f"data pulled is {data}")

第二个喜欢的完整代码

@dag(
    dag_id="xcom_test_end",
    default_args=default_dag_arguments,
    schedule_interval="@once",  # Schedule accordingly
    tags=["utility"]
)
def xcom_test_end():

    @task()
    def pull_data(**kwargs):
        # Pull data from XCom
        ti = kwargs['ti']
        data = ti.xcom_pull(task_ids='push_data', dag_id='xcom_test_start')
        print(f"data pulled is {data}")

    # Call the pull_data task
    pull_data_task = pull_data()

    return pull_data_task


second_dag = xcom_test_end()
© www.soinside.com 2019 - 2024. All rights reserved.