我正在构建一个 DBT 数据转换管道,它需要从 s3 位置读取 parquet 数据并将输出再次写入另一个 S3 位置。
-
需要进行哪些配置更改*注意:不想安装 duckdb 或任何其他集成。
期待无缝集成,从 S3 提取数据,处理数据并将处理后的输出存储回另一个 S3 位置。
从 Amazon S3 读取非常简单。您可以使用 read_files 表值函数 | AWS 上的 Databricks:
select * from read_files('s3://bucket_name/folder/', format => 'parquet')
您需要将 S3 存储桶设置为 外部位置 | AWS 上的 Databricks,以便它具有访问存储桶的凭据。
写作更困难。在dbt完成后,我实际上运行了一个Databricks Notebook,并使用Python提取数据并写入输出Parquet文件:
for table_name in tables_to_copy:
df = spark.read.table(f"{input_database}.{input_schema}.{table_name}")
df.write.mode("overwrite").parquet(f's3://{output_bucket}/{run_timestamp}/{table_name}/')
另一种方法可能是使用 Databricks 配置选项定义 dbt 模型来设置
OUTPUT
和 LOCATION
。但是,当该位置中可能存在先前运行留下的先前文件时,我不知道后续运行时会发生什么。