无法使用 DataflowCreatePythonJobOperator 在 Google Cloud Composer 中启动任何任务

问题描述 投票:0回答:1
import airflow
from airflow import DAG
from airflow.providers.google.cloud.operators.dataflow import DataflowCreatePythonJobOperator
from datetime import datetime, timedelta

default_args = {
    'start_date': airflow.utils.dates.days_ago(0),
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}

dag = DAG(
    'custom_python_operator',
    default_args=default_args,
    description='Just runs templates',
    schedule_interval='0 * * * *',
    max_active_runs=1,
    catchup=False,
    dagrun_timeout=timedelta(minutes=10),
)

dataflow_job = DataflowCreatePythonJobOperator(
    task_id='run_dataflow_pipeline',
    py_file='./template_runner_scripts/MultiSoapAPI_repeat_bkp.py',  # GCS path
    job_name='direct-script-1617' ,
    project_id='ai-data-ingestion-staging',
    location='us-central1',
    options={
        'ip_configuration': 'WORKER_IP_PUBLIC'
    },
    py_requirements=['apache-beam[gcp]==2.21.0'],
    py_interpreter='python3',
    py_system_site_packages=False
)

dataflow_job

composer ui 中没有生成任何任务

正如您在上面的代码中看到的,我们可以看到没有生成任何任务,代码目录和一切似乎都正常,我看不到任何错误。

airflow directed-acyclic-graphs google-cloud-composer
1个回答
1
投票

任务未分配给 DAG。您可以为每一项任务显式设置

dag
参数,如下所示:


dag = DAG(
    'custom_python_operator',
    ...,
)

dataflow_job = DataflowCreatePythonJobOperator(
    task_id='run_dataflow_pipeline',
    ...
    dag=dag,
)

或者,使用

DAG
对象作为上下文管理器。在这种方法中,您不需要为每个任务显式设置
dag
参数。

with DAG(
    'custom_python_operator',
    ...,
):
    dataflow_job = DataflowCreatePythonJobOperator(
    task_id='run_dataflow_pipeline',
    ...,
    )
© www.soinside.com 2019 - 2024. All rights reserved.