没有定义conn_id

问题描述 投票:0回答:1

我正在学习Airflow,我想了解连接的工作原理。

我有一个第一个dag与下面的代码。

c = Connection(
    conn_id='aws_credentials',
    conn_type='Amazon Web Services',
    login='xxxxxxxx',
    password='xxxxxxxxx'
)


def list_keys():
    hook = S3Hook(aws_conn_id=c.conn_id)
    logging.info(f"Listing Keys from {bucket}/{prefix}")
    keys = hook.list_keys(bucket, prefix=prefix)
    for key in keys:
        logging.info(f"- s3://{bucket}/{key}")

在这种情况下,它的工作是正常的。连接很好地传递到S3Hook。

然后我有第二个dag。

redshift_connection = Connection(
    conn_id='redshift',
    conn_type='postgres',
    login='duser',
    password='xxxxxxxxxx',
    host='xxxxxxxx.us-west-2.redshift.amazonaws.com',
    port=5439,
    schema='db'
)

aws_connection = Connection(
    conn_id='aws_credentials',
    conn_type='Amazon Web Services',
    login='xxxxxxxxx',
    password='xxxxxxxx'
)

def load_data_to_redshift(*args, **kwargs):
    aws_hook = AwsHook(aws_connection.conn_id)
    credentials = aws_hook.get_credentials()
    redshift_hook = PostgresHook(redshift_connection.conn_id)
    sql_stmnt = sql_statements.COPY_STATIONS_SQL.format(aws_connection.login, aws_connection.password)
    redshift_hook.run(sql_stmnt)

dag = DAG(
    's3_to_Redshift',
    start_date=datetime.datetime.now()
    )

create_table = PostgresOperator(
    task_id='create_table',
    postgres_conn_id=redshift_connection.conn_id,
    sql=sql_statements.CREATE_STATIONS_TABLE_SQL,
    dag=dag
    )

这个dag返回了以下错误 The conn_idredshiftisn't defined

为什么会这样?我的第一个和第二个dag之间的区别是什么?为什么连接似乎在第一个例子中工作,而不是在第二个情况下?

谢谢,我正在学习Airflow,我想了解连接是如何工作的。

airflow
1个回答
2
投票

连接通常是使用UI或CLI创建的,如上所述。此处 并由Airflow存储在数据库后端。然后,操作者和各自的钩子将一个连接ID作为参数,并使用它来检索这些连接的用户名、密码等。

在你的案例中,我怀疑你创建了一个ID为 aws_credentials 使用UI或CLI。所以,当你把它的ID传给 S3Hook 它成功地检索了凭证(从数据库中,而不是从 Connection 对象)。)

但是,你并没有创建一个ID为-的连接。redshift因此, AwsHook 抱怨说它没有被定义。你必须先按照文档中的描述创建连接。

注意:不在DAG代码中定义连接的原因是,DAG代码通常存储在版本控制系统中(如Git)。而把凭证存储在那里会有安全风险。

© www.soinside.com 2019 - 2024. All rights reserved.