S3 文件传输和 Glue ETL

问题描述 投票:0回答:1

我目前在一家新公司实习,担任数据工程师,在第一个任务中遇到了一些挑战。我希望有更多经验的人可以提供一些指导和资源。

问题是这样的:

任务:我的任务是将数据从一个 S3 存储桶(我们称之为账户 A)传输到另一个 S3 存储桶(账户 B)。源路径为 s3://X/Y,目标路径应为 s3://X/Y/daily X 是存储桶名称。

数据结构:在账户 A 的“Y”目录中,有许多文件夹,每个文件夹都以日期命名(例如“2022-01-01”)。每个日期文件夹内都有多个 .csv 文件。我的目标是将所有这些文件夹和文件移动到目标位置,同时将它们转换为 Parquet 格式。

挑战:我尝试了各种方法来实现这一目标,但似乎没有任何效果按预期进行。我相信设置 Glue ETL 作业可能是一个解决方案(必须仅通过 Glue ETL 完成),并且我在两个 S3 帐户中都具有读取和写入的必要权限。所以,这似乎不是权限问题。

我非常感谢您提供的任何帮助、指导或资源来帮助我成功完成这项任务。如果您曾在类似场景中使用过 Glue ETL 作业,或者对替代方法有建议,请分享您的见解。

先谢谢大家的支持!

挑战:我尝试了各种方法来实现这一目标,但似乎没有任何效果按预期进行。我相信设置 Glue ETL 作业可能是一个解决方案(必须仅通过 Glue ETL 完成),并且我在两个 S3 帐户中都具有读取和写入的必要权限。所以,这似乎不是权限问题。

amazon-web-services apache-spark amazon-s3 etl aws-glue
1个回答
0
投票

首先,如果您需要将数据从 CSV 格式转换为 Parquet,那么仅仅复制是行不通的。需要读取数据并写入一组新文件。

我假设所有 CSV 文件都具有相同的结构(列和类型)。如果不是这种情况,那么您将需要分组工作来处理每个不同的文件结构。

步骤1.

在 Spark(在您的情况下为 Glue)中,使用通配符将 CSV 读入 DataFrame,以同时读取所有日期文件夹

inputDF = spark.read.csv("s3://<basepath>/*/*.csv")

步骤 2.

添加包含输入文件名的列,以便能够从路径中提取日期,并在后续步骤中使用它来生成输出文件夹

val input2DF = inputDF.withColumn("filename", input_file_name())

步骤 3.

为要用作输出分区的每列添加列(例如年、月、日或整个日期)

val input3DF = input2DF.withColumn("year", substr(filename, 60, 4))
                       .withColumn("date", substr(filename, 60, 10))

这只是一个简单的例子,您可以更有创意并使用正则表达式函数。这取决于您的路径结构。

步骤 4.

删除不必要的列。

val outputDF = input3DF.drop("filename")

第 5 步。

使用Parquet格式将DataFrame数据写入tarteg文件夹

outputDF.write.partitionBy("year", "date").parquet("s3://<targetpath>")

按照这种方法,文件将以如下文件夹结尾:

s3://<targetpath>/year=*2023*/date=*20230115*/<parquet-file>

注意事项:

  1. 为了避免在输出中创建大量文件,请在写入之前考虑repartitionoutputDF。
  2. 用于创建分区的列的值应遵循目标文件系统的规则,以创建有效的文件夹名称(避免空格、符号等)
  3. 名称类似于
    name=value
    的文件夹是定义分区列的标准。 Parquet reader 知道如何发现它们并根据这些值创建列。 Parquet - 分区发现
© www.soinside.com 2019 - 2024. All rights reserved.