我在尝试将气流变量模板化为
PostgresOperator
sql 脚本时遇到问题。我的 sql 脚本如下所示:
UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
当前的问题是
datalake_bucket
。当我使用正常的PostgresOperator
时:
from airflow import DAG
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 11),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': notification_fail,
'on_retry_callback': notification_retry,
}
with DAG(
'learning_bronze_dag',
default_args=default_args,
description='Load all learning data to bronze',
catchup=False,
max_active_runs=1 ,
schedule_interval='@daily',
) as dag:
starter = DummyOperator(task_id="starter")
tables = [
'course_templates',
'courses',
'course_items',
'course_events',
'organization_permissions',
]
with TaskGroup('unload_tasks') as redshift_unload:
for table in tables:
op = CustomPostgresOperator(
task_id=table,
dag=dag,
postgres_conn_id=default_connection,
autocommit=True,
params={
'table_name': table,
'datalake_bucket': '{{var.value.datalake_bucket}}',
},
sql='sql/load_learning_to_s3.sql'
)
chain(starter, redshift_unload)
我在该任务中遇到 sql 错误:
psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid.
Failed to initialize S3 output stream. S3 path:
s3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/dt=2023-05-02/0002_part_00.parquet
所以我写了一个小运算符来将 params 字段转换为模板化字段:
from airflow.models.dag import DAG
from airflow.models.baseoperator import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults
from typing import Optional
class CustomPostgresOperator(BaseOperator):
"""Allows templating of parameter fields in
a postgres operator
"""
template_fields = ('params', 'sql')
template_fields_renderers = {
'sql': 'sql'
}
template_ext = ('.sql',)
ui_color = '#ededed'
@apply_defaults
def __init__(
self,
*,
sql: str,
autocommit: bool = False,
postgres_conn_id: str='redshift_default',
params: Optional[dict]=None,
database: Optional[str]=None,
**kwargs
):
super().__init__(**kwargs)
self.postgres_conn_id = postgres_conn_id
self.params = params or {}
self.hook = None
self.sql = sql
self.autocommit = autocommit
self.database = database
def execute(self, **kwargs):
self.log.info('Executing: %s', self.sql)
# adding this for visibility
self.log.info(f'Templating {self.params}')
self.hook = PostgresHook(postgres_conn_id=self.postgres_conn_id, schema=self.database)
self.hook.run(self.sql, self.autocommit, parameters=self.params)
for output in self.hook.conn.notices:
self.log.info(output)
我可以看到日志输出显示我的存储桶变量正在模板化:
[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'my-bucket'}
但在同一个日志输出中它仍然显示:
psycopg2.errors.InternalError_: S3ServiceException:The specified bucket is not valid
Failed to initialize S3 output stream. S3 path: s3://{{var.value.datalake_bucket}}...
为什么我无法发送datalake_bucket变量?
我正在使用 Amazon Managed Apache Airflow 版本 2.0.2 我目前无法升级,只是想了解为什么参数无法正常工作。
两个日志语句的
log.info
输出如下所示:
[2023-06-30 20:31:10,466] {{postgres.py:39}} INFO - Executing: UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_organization_permissions')
TO 's3://{{var.value.datalake_bucket}}/bronze/learning/organization_permissions/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
[2023-06-30 20:31:10,486] {{postgres.py:40}} INFO - Templating {'table_name': 'organization_permissions', 'datalake_bucket': 'mybucket'}
经过一系列尝试和错误,事实证明这是一个 X-Y 问题。我可以直接在 sql 脚本本身中引用模板化变量,因此我应该在 sql 脚本中混合使用固定参数、字符串值和 jinja 模板变量。我原来的剧本是:
UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
TO 's3://{{ params.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
但这确实应该是:
UNLOAD('SELECT *, trunc(updated_at) as dt FROM prodreadcopy.cmd_{{ params.table_name }}')
-- note the var.value.key format
TO 's3://{{ var.value.datalake_bucket }}/bronze/learning/{{ params.table_name }}/'
IAM_ROLE 'arn:aws:iam::1234567890:role/RedshiftETLRole'
PARTITION BY (dt)
PARQUET
ALLOWOVERWRITE;
所以我的
PostgresOperator
看起来像:
with TaskGroup('unload_tasks') as redshift_unload:
for table in tables:
op = PostgresOperator(
task_id=table,
dag=dag,
postgres_conn_id=default_connection,
autocommit=True,
params={
'table_name': table # only this value gets a param
},
sql='sql/load_learning_to_s3.sql'
)
已弃用。请使用PostgresOperator
airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator
在
SQLExecuteQueryOperator
中,参数已添加到 模板字段,因此您无需在 2 个不同的位置管理固定值和动态值。