我正在尝试将自定义运算符添加到Google Cloud Composer(Airflow),但似乎找不到该运算符。我已经花了很多时间并尝试过:
我修改了示例中的代码以尝试获取运算符。
dags / my_dag.py
import datetime
from airflow import DAG
# from airflow.models import Variable
# from airflow.operators import StopInstanceOperator
from airflow.operators.my_operator import StopInstanceOperator
# [END dag_imports]
# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]
# [START dag]
dag1 = DAG('backup_vm_instance',
description='Backup a Compute Engine instance using an Airflow DAG',
schedule_interval=INTERVAL,
start_date=START_DATE,
catchup=False)
# [END dag]
## Compute Engine tasks
stop_instance = StopInstanceOperator(
project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]
# Airflow DAG definition
begin >> stop_instance
plugins / my_operator.py
import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]
class StopInstanceOperator(BaseOperator):
"""Stops the virtual machine instance."""
@apply_defaults
def __init__(self, project, zone, instance, *args, **kwargs):
self.project = project
self.zone = zone
self.instance = instance
super(StopInstanceOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info("Hello world")
# [END stop_oper_no_xcom]
class GoogleComputeEnginePlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'gce_commands_plugin'
operators = [StopInstanceOperator]
这是代码/结构,我在Airflow上遇到的错误是
Broken DAG: [/home/airflow/gcs/dags/my_dag.py] No module named 'airflow.operators.my_operator'
我已经测试了您的Composer文件,并且在清理代码后看起来还可以。
首先,请确保已授予您以下permissions,这是添加和更新插件所必需的:
从plugins/my_operator.py
开始:
# [START imports]
import datetime
import logging
import time
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
# [END imports]
class StopInstanceOperator(BaseOperator):
"""Stops the virtual machine instance."""
@apply_defaults
def __init__(self, project, zone, instance, *args, **kwargs):
self.project = project
self.zone = zone
self.instance = instance
super(StopInstanceOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info('Hello world!')
class GoogleComputeEnginePlugin(AirflowPlugin):
"""Expose Airflow operators."""
name = 'gce_commands_plugin'
operators = [StopInstanceOperator]
代码正确,您需要注意name = 'gce_commands_plugin'
变量,该变量为插件提供内部名称gce_commands_plugin
(因此您可以在DAG文件中引用它)并向其中添加一个运算符([C0 ])。
谢谢,我们有StopInstanceOperator
:
dags/my_dag.py
我在这里注意到的内容:
# [START dag_imports]
import datetime
from airflow import DAG
from airflow.models import Variable
from airflow.operators.gce_commands_plugin import StopInstanceOperator
# [END dag_imports]
# [START dag_parameters]
INTERVAL = '@daily'
START_DATE = datetime.datetime(2018, 7, 16)
PROJECT = "project"
ZONE = "zone"
INSTANCE = "instance"
DISK = "disk"
# [END dag_parameters]
# [START dag]
dag1 = DAG('backup_vm_instance',
description='Backup a Compute Engine instance using an Airflow DAG',
schedule_interval=INTERVAL,
start_date=START_DATE,
catchup=False)
# [END dag]
# [START operators]
## Compute Engine tasks
stop_instance = StopInstanceOperator(
project=PROJECT, zone=ZONE, instance=INSTANCE, task_id='stop_instance')
# [END operators]
您正在尝试引用插件的文件名,这是错误的。您必须从from airflow.operators.my_operator import StopInstanceOperator
文件中引用name = 'gce_commands_plugin'
变量:
my_operator.py
您不能使用以下代码段:
from airflow.operators.gce_commands_plugin import StopInstanceOperator
这是因为您决定不使用定义# Airflow DAG definition
begin >> stop_instance
变量的dummy_operator
。
准备好文件后,您可以将它们复制到Composer的存储桶中,并在Airflow UI中看到良好的结果:
begin
希望对您有帮助。