我有 2 个 dag,如下所示(请注意,第二个 dag 在第一个 dag 之后自动运行 - 它们由气流数据集链接)。有没有办法让第二个 dag 检索第一个 dag 的 push_data 任务返回的值?
(现在我收到的是 '{{ task_instance.xcom_pull(task_ids='push_data', dag_id='xcom_test_start', key='return_value') }}')
uiflow_xcom_test_dataset = Dataset("uiflow_xcom_test")
@dag(
dag_id="xcom_test_start",
default_args=default_dag_arguments,
schedule_interval="@once",
start_date=pendulum.datetime(2024, 1, 1, tz="US/Eastern"),
)
def xcom_test_start():
@task(outlets=[ uiflow_xcom_test_dataset ])
def push_data():
return "sample data"
push_data()
first_dag = xcom_test_start()
@dag(
dag_id="xcom_test_end",
default_args=default_dag_arguments,
schedule=[ uiflow_xcom_test_dataset ],
tags=["utility"]
)
def xcom_test_end():
@task
def pull_data():
data = first_dag.tasks[0].output
print(f"data pulled is {data}")
pull_data()
xcom_test_end()
您可以使用 xcom_pull 而不是“first_dag.tasks[0].output”,例如
@task()
def pull_data(**kwargs):
# Pull data from XCom
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='push_data', dag_id='xcom_test_start')
print(f"data pulled is {data}")
第二个喜欢的完整代码
@dag(
dag_id="xcom_test_end",
default_args=default_dag_arguments,
schedule_interval="@once", # Schedule accordingly
tags=["utility"]
)
def xcom_test_end():
@task()
def pull_data(**kwargs):
# Pull data from XCom
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='push_data', dag_id='xcom_test_start')
print(f"data pulled is {data}")
# Call the pull_data task
pull_data_task = pull_data()
return pull_data_task
second_dag = xcom_test_end()