根据指定的输出文件大小使用 AWS Glue 脚本对 DynamicFrames 进行分区

问题描述 投票:0回答:1

我有一系列 parquet 文件,这些文件已放入 S3 存储桶内;这些文件的大小从几 KB 到几 GB 不等。然而,这些文件中包含的数据是根据相同的架构构建的,因此我能够运行指向 S3 存储桶的 Glue Crawler,以在 Glue 数据库内创建一个表,其中包含所交付文件中包含的所有记录。

我现在希望对 Glue Crawler 创建的表进行重新分区,以便每个分区都具有最大给定文件大小(目前为 10MB),然后将新分区作为一组输出到我的 S3 存储桶中的目录中。 CSV 文件;本质上是将表格写入标准化的“块”。

我按照 Glue 文档的建议尝试了几种方法,但这两种方法似乎都会出现问题:

  1. 首先,我使用包含原始文件的S3存储桶的元数据来计算需要多少个输出文件,以便可以均匀地重新分区表,使每个分区小于10MB,然后将分区的DynamicFrame写入S3 。这是通过以下脚本完成的(忽略标准 AWS Glue 脚本序言):
S3_MEMORY_SIZE = 2e10
OUTFILE_SIZE = 1e7
 
# Define transformation function
def partititionTransform(glueContext, dynamic_frame, num) -> DynamicFrame:

    # convert to pyspark dataframe so we can specify the number of output files partitions
    data_frame = dynamic_frame.toDF().repartition(num)

    # Convert to AWS Glue dynamic frames
    dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "dynamic_frame")
    return dynamic_frame

 
# Fetch parquet from S3 bucket & convert to a AWS Glue dynamic frame
glue_table = glueContext.create_dynamic_frame_from_catalog(
    database='glue-test-db',
    table_name='parquet',
    )
    
# number of required output files
num_files = int(table_size / OUTFILE_SIZE) + 1

# Implement Transforms. Split & Convert input file
out_glue_table = partitionTransform(glueContext, glue_table, num_files)
 
# Upload CSV files to target s3 bucket with a new folder
glue_table = glueContext.write_dynamic_frame.from_options(
    frame=out_glue_table,
    connection_type="s3",
    format="csv",
    format_options={"writeHeader": True},
    connection_options={"path": "s3://exmaple-bucket/cleaned-data/"})
 
job.commit()

但是在运行时,Glue 会抛出以下错误:

Error Category: UNCLASSIFIED_ERROR; Py4JError: An error occurred while calling o131.repartition. Trace:

如果我不将表分区为

num
分区,而是将其硬编码为分区为 2,那么这种情况就会消失并按预期工作。因此,我将此错误解释为表明我请求了太多分区。

  1. 我尝试的第二种方法放弃了分区函数,而是尝试使用作业分区,将额外的
    boundedSize
    参数设置为
    '10000000'
    方法内部的
    create_dynamic_frame_from_catalog
    ,然后只是编写(我假设是分区)文件到 S3 位置:
glue_table = glueContext.create_dynamic_frame_from_catalog(
    database='glue-test-db',
    table_name='parquet',
    additional_options = {"boundedSize": "10000000"},
    )
  
glue_table = glueContext.write_dynamic_frame.from_options(
    frame=glue_table,
    connection_type="s3",
    format="csv",
    format_options={"writeHeader": True},
    connection_options={"path": "s3://example-bucket/cleaned-data/"})

虽然这在运行时不会引发错误并且作业实际上成功了,但查看目标 S3 目录显示已写入多个文件,但它们不是 131MB 就是 180MB。还有大约 20 个几百字节的文件,经检查仅包含 CSV 格式的表头。

AWS 文档似乎不太清楚,我没有发现任何迹象表明为什么这些解决方案似乎失败;尽管多个来源建议将这两种方法作为将较大的表分解为多个最大大小的文件的解决方案。

任何有关这些问题的原因或根源的建议将不胜感激。

目前这只是一个尝试使用 Glue 功能的实验,如果采用该功能,将用于处理总计数百 GB 的数据。因此,我意识到,在生产中,由于执行程序内存不足,将完整表读入内存并尝试将其转换为 Spark 数据帧以执行

.repartition
方法(如选项 1)可能不可行。因此,任何考虑到这一潜在未来障碍的建议都将受到更赞赏。

python amazon-web-services aws-glue
1个回答
0
投票

我有一些与你想要的类似的东西 达到。 但就我而言,我每天都会生成大量 100kb 到 200Mb 之间的小文件(或多或少 500 万个)。 月底我会合并成大文件(每个 5Gb)。

建议不要使用小于 128Mbs 的 parquet 文件,但对于外部表,较大的文件比较小的文件更好,这将使查询运行得更快。

这里是实现的代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

job.init(args['JOB_NAME'], args)
#input data 

df = glueContext.create_dynamic_frame.from_options(connection_type="parquet", connection_options={'paths': ["s3://my-bucket/test01/partition=something/year=2024/month=02/"]})
partitioned_df=df.toDF().repartition(1)
partitioned_dynamic_df=DynamicFrame.fromDF(partitioned_df,glueContext,"partitioned_df")
#output data
datasink0=glueContext.write_dynamic_frame.from_options(frame=partitioned_dynamic_df,connection_type="s3", connection_options={'path':"s3://my-bucket/merged_month/partition=something/year=2024/month=02/"}, format="parquet")

job.commit()
© www.soinside.com 2019 - 2024. All rights reserved.