PySpark DataFrames 与 Glue DynamicFrames 的性能

问题描述 投票:0回答:1

所以我最近第一次开始使用 Glue 和 PySpark。任务是创建一个执行以下操作的 Glue 作业:

  1. 从驻留在 S3 存储桶中的 parquet 文件加载数据
  2. 对数据应用过滤器
  3. 添加一列,其值源自其他 2 列
  4. 将结果写入S3

由于数据从 S3 传输到 S3,我认为 Glue DynamicFrames 应该非常适合于此,我想出了以下代码:

def AddColumn(r):
   if r["option_type"] == 'S': 
       r["option_code_derived"]= 'S'+ r["option_code_4"]
   elif r["option_type"] == 'P': 
       r["option_code_derived"]= 'F'+ r["option_code_4"][1:]
   elif r["option_type"] == 'L':
       r["option_code_derived"]= 'P'+ r["option_code_4"]
   else:  
       r["option_code_derived"]= None
       
   return r

glueContext = GlueContext(create_spark_context(role_arn=args['role_arn']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [source_path], "recurse" : True}, format = source_format, additional_options = {"useS3ListImplementation":True})

filtered_gdf = Filter.apply(frame = inputGDF, f = lambda x: x["my_filter_column"] in ['50','80'])

additional_column_gdf = Map.apply(frame = filtered_gdf, f = AddColumn) 

gdf_mapped = ApplyMapping.apply(frame = additional_column_gdf, mappings = mappings, transformation_ctx = "gdf_mapped") 

glueContext.purge_s3_path(full_target_path_purge, {"retentionPeriod": 0})

outputGDF = glueContext.write_dynamic_frame.from_options(frame = gdf_mapped, connection_type = "s3", connection_options = {"path": full_target_path}, format = target_format)

这可行,但需要很长时间(20 个 G1.X 工作人员只需要不到 10 个小时)。 现在,数据集相当大(近 20 亿条记录,超过 400 GB),但这仍然是出乎意料的(至少对我来说)。

然后我再次尝试,这次使用 PySpark DataFrames 而不是 DynamicFrames。 代码如下所示:

glueContext = GlueContext(create_spark_context(role_arn=args['role_arn'], source_bucket=args['s3_source_bucket'], target_bucket=args['s3_target_bucket']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

df = spark.read.parquet(full_source_path)

df_filtered = df.filter( (df.model_key_status  == '50') | (df.model_key_status  == '80') )

df_derived = df_filtered.withColumn('option_code_derived', 
     when(df_filtered.option_type == "S", concat(lit('S'), df_filtered.option_code_4))
    .when(df_filtered.option_type == "P", concat(lit('F'), df_filtered.option_code_4[2:42]))
    .when(df_filtered.option_type == "L", concat(lit('P'), df_filtered.option_code_4))
    .otherwise(None))

glueContext.purge_s3_path(full_purge_path, {"retentionPeriod": 0})

df_reorderered = df_derived.select(target_columns)

df_reorderered.write.parquet(full_target_path, mode="overwrite")

这也可行,但在其他方面相同的设置(20 个 G1.X 类型的工作人员,相同的数据集)下,这需要不到 20 分钟。

我的问题是:DynamicFrames 和 DataFrames 之间巨大的性能差异从何而来?我在第一次尝试中是否做了一些根本性错误?

pyspark aws-glue
1个回答
0
投票

不确定这是否是答案,但是这里他们解释说DynamicFrame处理模式,我知道它是DataFrame的包装器,每个记录都有某种元数据

DynamicFrame 与 DataFrame 类似,不同之处在于每个记录都是自描述的,因此最初不需要模式

© www.soinside.com 2019 - 2024. All rights reserved.