Airflow中通过xcom引用的文件名

问题描述 投票:1回答:1

我试图了解如何通过airflow xcom功能传递值。我要构建的特定用例是编写一个文件,然后移动它,然后运行另一个命令。我的想法是将文件名从一个运算符传递给下一个。

这是我所拥有的:

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt

DAG = DAG(
  dag_id='xcom_test_dag',
  start_date=dt.datetime.now(),
  schedule_interval='@once'
)

def push_function(**context):
    file_name = 'test_file_{date}'.format(date=dt.datetime.now())
    return context['task_instance'].xcom_push(key='filename', value=file_name)

def pull_function(**context):
    dir(context['task_instance'].xcom_pull())

push_task = PythonOperator(
    task_id='push_task', 
    python_callable=push_function,
    provide_context=True,
    dag=DAG)

pull_task = PythonOperator(
    task_id='pull_task', 
    python_callable=pull_function,
    provide_context=True,
    dag=DAG)

push_task >> pull_task

如果要在pull_task中引用文件名,以便可以执行读取文件-应该怎么称呼它?尝试访问context['task_instance']不包含值。进一步-尝试从任务到任务/从操作员到操作员引用这样的文件名是最佳实践吗?

airflow
1个回答
0
投票

从XCOM提取数据时,您希望提供将数据推送到的任务的任务ID。在您的示例中,您的推送任务的task_id为push_task,因此您需要执行以下操作:

value = context['task_instance'].xcom_pull(task_ids='push_task')

但是,从气流文档中,请注意:

默认情况下,xcom_pull()筛选通过执行功能返回而被推送给XCom的键(与手动推动的XCom相对应)。

如果要使用特定的键将数据手动推送到XCOM,则在调用xcom_pull时可能需要包括该键。在您的示例中,您在推送任务中推送了一个名为filename的键,因此您可能需要在推送任务中执行以下操作:

value = context['task_instance'].xcom_pull(task_ids='push_task', key='filename')

此信息在Airflow文档中有更详细的概述:https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#concepts-xcom

关于您有关“最佳实践”的问题-在气流任务/操作员之间进行通信,XCOM是最佳的选择。但是,如果要跨多个操作员从磁盘读取文件,则需要确保所有工作人员都有权访问文件的存储位置。如果无法做到这一点,则替代方法可能是使推送任务远程存储该文件(例如,在AWS S3中),然后将S3 URL推送到XCOM。然后,pull任务可以从XCOM读取S3 URL,并从S3下载文件。

© www.soinside.com 2019 - 2024. All rights reserved.