Airflow PostgresOperator 中的模板问题

问题描述 投票:0回答:2

我在尝试将气流变量模板化为

PostgresOperator
sql 脚本时遇到问题。我的 sql 脚本如下所示:

UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;

当前的问题是

datalake_bucket
。当我使用正常的
PostgresOperator
时:

from airflow import DAG
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 5, 11),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=1),
    'on_failure_callback': notification_fail,
    'on_retry_callback': notification_retry,
}

with DAG(
    'learning_bronze_dag',
    default_args=default_args,
    description='Load all learning data to bronze',
    catchup=False,
    max_active_runs=1 ,
    schedule_interval='@daily', 
) as dag:

    starter = DummyOperator(task_id="starter")

    tables = [
        'course_templates',
        'courses',
        'course_items',
        'course_events',
        'organization_permissions',
    ]

    with TaskGroup('unload_tasks') as redshift_unload:
        for table in tables:
            op = CustomPostgresOperator(
                task_id=table,
                dag=dag,
                postgres_conn_id=default_connection,
                autocommit=True,
                params={
                    'table_name': table,
                    'datalake_bucket': '{{var.value.datalake_bucket}}',
                },
                sql='sql/load_learning_to_s3.sql'
            )

    chain(starter, redshift_unload)

我在该任务中遇到 sql 错误:

psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid.
Failed to initialize S3 output stream. S3 path: 
s3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/dt=2023-05-02/0002_part_00.parquet

所以我写了一个小运算符来将 params 字段转换为模板化字段:

from airflow.models.dag import DAG
from airflow.models.baseoperator import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults
from typing import Optional


class CustomPostgresOperator(BaseOperator):
    """Allows templating of parameter fields in
    a postgres operator
    """
    template_fields = ('params', 'sql')
    template_fields_renderers = {
        'sql': 'sql'
    }
    template_ext = ('.sql',)
    ui_color = '#ededed'

    @apply_defaults
    def __init__(
        self,
        *,
        sql: str,
        autocommit: bool = False,
        postgres_conn_id: str='redshift_default',
        params: Optional[dict]=None,
        database: Optional[str]=None,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.postgres_conn_id = postgres_conn_id
        self.params = params or {}
        self.hook = None
        self.sql = sql
        self.autocommit = autocommit
        self.database = database


    def execute(self, **kwargs):
        self.log.info('Executing: %s', self.sql)

        # adding this for visibility
        self.log.info(f'Templating {self.params}')

        self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
        self.hook.run(self.sql, self.autocommit, parameters=self.params)
        for output in self.hook.conn.notices:
            self.log.info(output)

我可以看到日志输出显示我的存储桶变量正在模板化:

[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'my-bucket'}

但在同一个日志输出中它仍然显示:

psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid
Failed to initialize S3 output stream. S3 path: s3://{{var.value.datalake_bucket}}...

为什么我无法发送datalake_bucket变量?

我正在使用 Amazon Managed Apache Airflow 版本 2.0.2 我目前无法升级,只是想了解为什么参数无法正常工作。

编辑

两个日志语句的

log.info
输出如下所示:

[2023-06-30 20:31:10,466] {{postgres.py:39}} INFO - Executing: UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_organization_permissions')
TO 's3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'mybucket'}
python postgresql amazon-s3 airflow mwaa
2个回答
2
投票

经过一系列尝试和错误,事实证明这是一个 X-Y 问题。我可以直接在 sql 脚本本身中引用模板化变量,因此我应该在 sql 脚本中混合使用固定参数、字符串值和 jinja 模板变量。我原来的剧本是:

UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;

但这确实应该是:

UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
-- note the var.value.key format
TO 's3://{{ var.value.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;

所以我的

PostgresOperator
看起来像:

    with TaskGroup('unload_tasks') as redshift_unload:
        for table in tables:
            op = PostgresOperator(
                task_id=table,
                dag=dag,
                postgres_conn_id=default_connection,
                autocommit=True,
                params={
                    'table_name': table # only this value gets a param
                },
                sql='sql/load_learning_to_s3.sql'
            )

0
投票

PostgresOperator
已弃用。请使用
airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator

SQLExecuteQueryOperator
中,参数已添加到 模板字段,因此您无需在 2 个不同的位置管理固定值和动态值。

© www.soinside.com 2019 - 2024. All rights reserved.