直接将雪花 dwh 中的表加载到 postgreSQL 数据库中

问题描述 投票:0回答:1

我正在尝试编写一些 python 脚本,这些脚本将获取 Snowflake 中的表并将它们传输到 postgreSQL 数据库。我必须将雪花表中的每一行转换为 json 字符串,因此在 postgreSQL 中,该表将只有索引列和一个包含雪花表中整行的 json 字符串。

我可以处理这部分,但我很难缩短加载时间。我当前的方法涉及使用 Snowflake COPY 命令将 snwoflake 表作为 csv.gz 文件提取到 s3 存储桶中。然后,在 s3 中,我使用 psycopg2 库连接到 postgreSQL 数据库,并使用 copy_expert 方法和另一个 COPY 查询将所有 csv 文件从 s3 加载到我的 postgreSQL 数据库中的特定表中。

整个过程需要我 2-3 小时才能加载大约 50-60 GB 的数据。通过谷歌搜索,我得到的印象是这可以更快地完成,并且想知道我是否可以使用 python 从雪花直接连接到 postgreSQL 并执行一些命令来复制数据?任何建议表示赞赏,谢谢!

当前方法涉及使用 python 对 Snowflake 和 postgreSQL 数据库执行查询。对雪花中的表进行一些操作后,我使用以下方法将其复制到 s3 中:

import os
import psycopg2
import boto3
from connections import SnFlDWH

# copy table from snowflake into s3
query = f"COPY INTO 's3://{bucket_name}/{s3_key}'
FROM (
    SELECT * 
    FROM {db_name}.{schema_name}.{table_name}
)
FILE_FORMAT = (
    TYPE = CSV,
    FIELD_OPTIONALLY_ENCLOSED_BY = '\"' 
)
OVERWRITE = TRUE
CREDENTIALS = (
    AWS_KEY_ID='{aws_key}',
    AWS_SECRET_KEY='{aws_secret}'
);"

# establish a connection to snowflake and execute the query
snfl_conn = SnFlDWH()
snfl_conn.execqute_sql(query)

# establish a connection to postgreSQL db
pstg_conn = psycopg2.connect(
        host=host,
        database=database,
        user=user,
        password=password
    )

cursor = conn.cursor()

# connect to s3 and get a list of all files on specific s3 bucket
# download each csv.gz file and read the file in locally
# load each csv file into the specified postgreSQL table using the COPY command
s3_client = boto3.client('s3', aws_key=aws_key, aws_secret=aws_secret, region_name=region_name)
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=s3_key)
    for obj in response.get('Contents', []):
        file_name = obj['Key']
        if file_name.endswith('.csv.gz'):
            # Download files from S3
            local_file_name = f"/tmp/{os.path.basename(file_name)}"
            s3_client.download_file(bucket_name, file_name, local_file_name)
            #Load data from local file into Postgres table
            with gzip.open(local_file_name, 'rb') as f:
                cursor.copy_expert(f"COPY {postgres_table_name} FROM STDIN WITH CSV DELIMITER ','", f)
            # Clean up: Delete the downloaded file
            os.remove(local_file_name)
            #Delete file from S3
            s3_client.delete_object(Bucket=bucket_name, Key=file_name)
            
    # Commit the transaction and close the cursor and connection
    conn.commit()
    cursor.close()
    conn.close()

database postgresql snowflake-cloud-data-platform etl data-warehouse
1个回答
0
投票

访问 Snowflake 之外的数据库或其他服务需要使用外部网络访问。请参阅从 Snowflake 中连接到本地数据库,了解更多信息以及连接到另一个数据库的示例。

© www.soinside.com 2019 - 2024. All rights reserved.