我试图了解如何通过airflow
xcom
功能传递值。我要构建的特定用例是编写一个文件,然后移动它,然后运行另一个命令。我的想法是将文件名从一个运算符传递给下一个。
这是我所拥有的:
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import datetime as dt
DAG = DAG(
dag_id='xcom_test_dag',
start_date=dt.datetime.now(),
schedule_interval='@once'
)
def push_function(**context):
file_name = 'test_file_{date}'.format(date=dt.datetime.now())
return context['task_instance'].xcom_push(key='filename', value=file_name)
def pull_function(**context):
dir(context['task_instance'].xcom_pull())
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=DAG)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=DAG)
push_task >> pull_task
如果要在pull_task中引用文件名,以便可以执行读取文件-应该怎么称呼它?尝试访问context['task_instance']
不包含值。进一步-尝试从任务到任务/从操作员到操作员引用这样的文件名是最佳实践吗?
从XCOM提取数据时,您希望提供将数据推送到的任务的任务ID。在您的示例中,您的推送任务的task_id为push_task
,因此您需要执行以下操作:
value = context['task_instance'].xcom_pull(task_ids='push_task')
但是,从气流文档中,请注意:
默认情况下,xcom_pull()筛选通过执行功能返回而被推送给XCom的键(与手动推动的XCom相对应)。
如果要使用特定的键将数据手动推送到XCOM,则在调用
xcom_pull
时可能需要包括该键。在您的示例中,您在推送任务中推送了一个名为filename
的键,因此您可能需要在推送任务中执行以下操作:
value = context['task_instance'].xcom_pull(task_ids='push_task', key='filename')
此信息在Airflow文档中有更详细的概述:https://airflow.apache.org/docs/stable/concepts.html?highlight=xcom#concepts-xcom
关于您有关“最佳实践”的问题-在气流任务/操作员之间进行通信,XCOM是最佳的选择。但是,如果要跨多个操作员从磁盘读取文件,则需要确保所有工作人员都有权访问文件的存储位置。如果无法做到这一点,则替代方法可能是使推送任务远程存储该文件(例如,在AWS S3中),然后将S3 URL推送到XCOM。然后,pull任务可以从XCOM读取S3 URL,并从S3下载文件。
• How to add rows to a table using xcom_pull in PostgresOperator AIRFLOW
• 我们可以通过 airflow web-server UI 上传文件并触发 DAG 吗?
• Excel INDEX MATCH 通过标题名称将另一个文件匹配到动态位置,无需表引用
• Airflow:如何在 PythonOperator 中使用 dag_run 参数值作为 trigger_rule
• 如何将 jinja 模板传递给 Airflow DataBricksSubmitRunOperator 中的 execution_timeout 参数?
• 使用 DagBag 在 Cloud Composer 中测试 dags 时出现 ModuleNotFoundError
• airflow 如何使用 git sync 从 git 分支的 dag 文件夹中获取 dags
• 如何从不同的 python 文件触发或运行气流 dag。这就像开发一个将运行 dag 并验证其结果的测试用例
• 春云数据流和气流
• Attempting to do detail validation on a db created in sqlite 3 using python