所以我最近第一次开始使用 Glue 和 PySpark。任务是创建一个执行以下操作的 Glue 作业:
由于数据从 S3 传输到 S3,我认为 Glue DynamicFrames 应该非常适合于此,我想出了以下代码:
def AddColumn(r):
if r["option_type"] == 'S':
r["option_code_derived"]= 'S'+ r["option_code_4"]
elif r["option_type"] == 'P':
r["option_code_derived"]= 'F'+ r["option_code_4"][1:]
elif r["option_type"] == 'L':
r["option_code_derived"]= 'P'+ r["option_code_4"]
else:
r["option_code_derived"]= None
return r
glueContext = GlueContext(create_spark_context(role_arn=args['role_arn']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": [source_path], "recurse" : True}, format = source_format, additional_options = {"useS3ListImplementation":True})
filtered_gdf = Filter.apply(frame = inputGDF, f = lambda x: x["my_filter_column"] in ['50','80'])
additional_column_gdf = Map.apply(frame = filtered_gdf, f = AddColumn)
gdf_mapped = ApplyMapping.apply(frame = additional_column_gdf, mappings = mappings, transformation_ctx = "gdf_mapped")
glueContext.purge_s3_path(full_target_path_purge, {"retentionPeriod": 0})
outputGDF = glueContext.write_dynamic_frame.from_options(frame = gdf_mapped, connection_type = "s3", connection_options = {"path": full_target_path}, format = target_format)
这可行,但需要很长时间(20 个 G1.X 工作人员只需要不到 10 个小时)。 现在,数据集相当大(近 20 亿条记录,超过 400 GB),但这仍然是出乎意料的(至少对我来说)。
然后我再次尝试,这次使用 PySpark DataFrames 而不是 DynamicFrames。 代码如下所示:
glueContext = GlueContext(create_spark_context(role_arn=args['role_arn'], source_bucket=args['s3_source_bucket'], target_bucket=args['s3_target_bucket']))
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
df = spark.read.parquet(full_source_path)
df_filtered = df.filter( (df.model_key_status == '50') | (df.model_key_status == '80') )
df_derived = df_filtered.withColumn('option_code_derived',
when(df_filtered.option_type == "S", concat(lit('S'), df_filtered.option_code_4))
.when(df_filtered.option_type == "P", concat(lit('F'), df_filtered.option_code_4[2:42]))
.when(df_filtered.option_type == "L", concat(lit('P'), df_filtered.option_code_4))
.otherwise(None))
glueContext.purge_s3_path(full_purge_path, {"retentionPeriod": 0})
df_reorderered = df_derived.select(target_columns)
df_reorderered.write.parquet(full_target_path, mode="overwrite")
这也可行,但在其他方面相同的设置(20 个 G1.X 类型的工作人员,相同的数据集)下,这需要不到 20 分钟。
我的问题是:DynamicFrames 和 DataFrames 之间巨大的性能差异从何而来?我在第一次尝试中是否做了一些根本性错误?
不确定这是否是答案,但是这里他们解释说DynamicFrame处理模式,我知道它是DataFrame的包装器,每个记录都有某种元数据
DynamicFrame 与 DataFrame 类似,不同之处在于每个记录都是自描述的,因此最初不需要模式