import airflow
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator
from datetime import datetime, timedelta
default_args = {
'start_date': airflow.utils.dates.days_ago(0),
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'custom_python_operator',
default_args=default_args,
description='Just runs templates',
schedule_interval='0 * * * *',
max_active_runs=1,
catchup=False,
dagrun_timeout=timedelta(minutes=10),
)
dataflow_job = DataflowCreatePythonJobOperator(
task_id='run_dataflow_pipeline',
py_file='./template_runner_scripts/MultiSoapAPI_repeat_bkp.py', # GCS path
job_name='direct-script-1617' ,
project_id='ai-data-ingestion-staging',
location='us-central1',
options={
'ip_configuration': 'WORKER_IP_PUBLIC'
},
py_requirements=['apache-beam[gcp]==2.21.0'],
py_interpreter='python3',
py_system_site_packages=False
)
dataflow_job
正如您在上面的代码中看到的,我们可以看到没有生成任何任务,代码目录和一切似乎都正常,我看不到任何错误。
任务未分配给 DAG。您可以为每一项任务显式设置
dag
参数,如下所示:
dag = DAG(
'custom_python_operator',
...,
)
dataflow_job = DataflowCreatePythonJobOperator(
task_id='run_dataflow_pipeline',
...
dag=dag,
)
或者,使用
DAG
对象作为上下文管理器。在这种方法中,您不需要为每个任务显式设置 dag
参数。
with DAG(
'custom_python_operator',
...,
):
dataflow_job = DataflowCreatePythonJobOperator(
task_id='run_dataflow_pipeline',
...,
)