我目前在一家新公司实习,担任数据工程师,在第一个任务中遇到了一些挑战。我希望有更多经验的人可以提供一些指导和资源。
问题是这样的:
任务:我的任务是将数据从一个 S3 存储桶(我们称之为账户 A)传输到另一个 S3 存储桶(账户 B)。源路径为 s3://X/Y,目标路径应为 s3://X/Y/daily X 是存储桶名称。
数据结构:在账户 A 的“Y”目录中,有许多文件夹,每个文件夹都以日期命名(例如“2022-01-01”)。每个日期文件夹内都有多个 .csv 文件。我的目标是将所有这些文件夹和文件移动到目标位置,同时将它们转换为 Parquet 格式。
挑战:我尝试了各种方法来实现这一目标,但似乎没有任何效果按预期进行。我相信设置 Glue ETL 作业可能是一个解决方案(必须仅通过 Glue ETL 完成),并且我在两个 S3 帐户中都具有读取和写入的必要权限。所以,这似乎不是权限问题。
我非常感谢您提供的任何帮助、指导或资源来帮助我成功完成这项任务。如果您曾在类似场景中使用过 Glue ETL 作业,或者对替代方法有建议,请分享您的见解。
先谢谢大家的支持!
挑战:我尝试了各种方法来实现这一目标,但似乎没有任何效果按预期进行。我相信设置 Glue ETL 作业可能是一个解决方案(必须仅通过 Glue ETL 完成),并且我在两个 S3 帐户中都具有读取和写入的必要权限。所以,这似乎不是权限问题。
首先,如果您需要将数据从 CSV 格式转换为 Parquet,那么仅仅复制是行不通的。需要读取数据并写入一组新文件。
我假设所有 CSV 文件都具有相同的结构(列和类型)。如果不是这种情况,那么您将需要分组工作来处理每个不同的文件结构。
步骤1.
在 Spark(在您的情况下为 Glue)中,使用通配符将 CSV 读入 DataFrame,以同时读取所有日期文件夹。
inputDF = spark.read.csv("s3://<basepath>/*/*.csv")
步骤 2.
添加包含输入文件名的列,以便能够从路径中提取日期,并在后续步骤中使用它来生成输出文件夹
val input2DF = inputDF.withColumn("filename", input_file_name())
步骤 3.
为要用作输出分区的每列添加列(例如年、月、日或整个日期)
val input3DF = input2DF.withColumn("year", substr(filename, 60, 4))
.withColumn("date", substr(filename, 60, 10))
这只是一个简单的例子,您可以更有创意并使用正则表达式函数。这取决于您的路径结构。
步骤 4.
删除不必要的列。
val outputDF = input3DF.drop("filename")
第 5 步。
使用Parquet格式将DataFrame数据写入tarteg文件夹
outputDF.write.partitionBy("year", "date").parquet("s3://<targetpath>")
按照这种方法,文件将以如下文件夹结尾:
s3://<targetpath>/year=*2023*/date=*20230115*/<parquet-file>
注意事项:
name=value
的文件夹是定义分区列的标准。 Parquet reader 知道如何发现它们并根据这些值创建列。 Parquet - 分区发现