在Google Cloud Composer上添加自定义运算符

问题描述 投票:1回答:1

我正在尝试将自定义运算符添加到Google Cloud Composer(Airflow),但似乎找不到该运算符。我已经花了很多时间并尝试过:

我修改了示例中的代码以尝试获取运算符。

dags / my_dag.py

import datetime
from airflow import DAG
# from airflow.models import Variable
# from airflow.operators import StopInstanceOperator
from airflow.operators.my_operator import StopInstanceOperator
# [END dag_imports]

# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]

# [START dag]
dag1 = DAG('backup_vm_instance',
           description='Backup a Compute Engine instance using an Airflow DAG',
           schedule_interval=INTERVAL,
           start_date=START_DATE,
           catchup=False)
# [END dag]

## Compute Engine tasks
stop_instance = StopInstanceOperator(
    project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]

# Airflow DAG definition
begin >> stop_instance

plugins / my_operator.py

import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]


class StopInstanceOperator(BaseOperator):
  """Stops the virtual machine instance."""

  @apply_defaults
  def __init__(self, project, zone, instance, *args, **kwargs):
    self.project = project
    self.zone = zone
    self.instance = instance
    super(StopInstanceOperator, self).__init__(*args, **kwargs)

  def execute(self, context):
    logging.info("Hello world")
    # [END stop_oper_no_xcom]

class GoogleComputeEnginePlugin(AirflowPlugin):
  """Expose Airflow operators."""

  name = 'gce_commands_plugin'
  operators = [StopInstanceOperator]

这是代码/结构,我在Airflow上遇到的错误是

Broken DAG: [/home/airflow/gcs/dags/my_dag.py] No module named 'airflow.operators.my_operator'

python airflow google-cloud-composer
1个回答
0
投票

我已经测试了您的Composer文件,并且在清理代码后看起来还可以。

首先,请确保已授予您以下permissions,这是添加和更新插件所必需的:

  • storage.objectAdmin上载文件
  • composer.environments.get查找DAG目标存储桶。使用Cloud Storage API或gsutil时不需要此权限

plugins/my_operator.py开始:

# [START imports]
import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]

class StopInstanceOperator(BaseOperator):
  """Stops the virtual machine instance."""

  @apply_defaults
  def __init__(self, project, zone, instance, *args, **kwargs):
    self.project = project
    self.zone = zone
    self.instance = instance
    super(StopInstanceOperator, self).__init__(*args, **kwargs)

  def execute(self, context):
    logging.info('Hello world!')

class GoogleComputeEnginePlugin(AirflowPlugin):
  """Expose Airflow operators."""

  name = 'gce_commands_plugin'
  operators = [StopInstanceOperator]

代码正确,您需要注意name = 'gce_commands_plugin'变量,该变量为插件提供内部名称gce_commands_plugin(因此您可以在DAG文件中引用它)并向其中添加一个运算符([C0 ])。

谢谢,我们有StopInstanceOperator

dags/my_dag.py

我在这里注意到的内容:

# [START dag_imports]
import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.gce_commands_plugin import StopInstanceOperator
# [END dag_imports]

# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]

# [START dag]
dag1 = DAG('backup_vm_instance',
           description='Backup a Compute Engine instance using an Airflow DAG',
           schedule_interval=INTERVAL,
           start_date=START_DATE,
           catchup=False)
# [END dag]

# [START operators]
## Compute Engine tasks
stop_instance = StopInstanceOperator(
    project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]

您正在尝试引用插件的文件名,这是错误的。您必须从from airflow.operators.my_operator import StopInstanceOperator 文件中引用name = 'gce_commands_plugin'变量:

my_operator.py

您不能使用以下代码段:

from airflow.operators.gce_commands_plugin import StopInstanceOperator

这是因为您决定不使用定义# Airflow DAG definition begin >> stop_instance 变量的dummy_operator

准备好文件后,您可以将它们复制到Composer的存储桶中,并在Airflow UI中看到良好的结果:

begin

希望对您有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.