我在 AWS Glue(4.0) 交互式会话上创建了 PySpark 脚本,主要用于数据验证。
initial_df = glueContext.create_dynamic_frame.from_options(
format_options = {'quoteChar':'"', 'withHeader': True, 'separator': ', '},
connection_type='s3',
format='csv',
connection_options={ 'paths': ['<s3_path_of_file>'], 'recurse': True}
transformed_df = ApplyMapping.apply(
frame=initial_df,
mappings = [
'col1', 'string', 'col1', 'integer'
'col2', 'string', 'col2', 'float'
]
def func1(row):
result = dict()
for col, val in row.asDict().items():
if (isnan(val)) or (isinstance(val, schema_dict.get(col))):
res['Success'] = np.nan
else:
res['Failure'] = f'cell: {val} failed during validation'
updated_row = Row(**row.asDict(), dataqualityresult=result)
return updated_row
spark_df = transformed_df.toDF()
validated_df = spark_df.rdd.map(func1)
new_df = validated_df.toDF()
exploded_df = new_df.select(
col1,
col2,
explode('dataqualityresult').alias('eval_status', 'eval_result')
)
exploded_df.count()
success_df = exploded_df.filter(new_df.eval_status == 'Success')
success_df.count()
我仍然想知道为什么在应用缓存/持久后它会触发该功能? 有人可以帮我吗?
仅供参考,尝试使用UDF以相同的结果结束
带有附加列的转换后的数据框,并根据 eval_status 获取计数以进行进一步转换。
我能够采取解决方法。我使用 rdd.map(lambda x: function(x)) 遍历数据帧中的每一行并执行一些计算,然后使用过滤器来获取所需的数据,然后对过滤后的数据应用计数以进行进一步处理。 这解决了我的用例。