在 pyspark 中,在数据帧上调用操作时,每个操作都会多次调用转换函数

问题描述 投票:0回答:1

我在 AWS Glue(4.0) 交互式会话上创建了 PySpark 脚本,主要用于数据验证。

  • 使用 Glue Dynamic Frame 读取数据帧
initial_df = glueContext.create_dynamic_frame.from_options(
  format_options = {'quoteChar':'"', 'withHeader': True, 'separator': ', '},
  connection_type='s3',
  format='csv',
   connection_options={ 'paths': ['<s3_path_of_file>'], 'recurse': True}
  • 使用ApplyMapping.apply()完成转换,用于重命名列并更改列的模式
transformed_df = ApplyMapping.apply(
  frame=initial_df,
  mappings = [ 
     'col1', 'string', 'col1', 'integer'
     'col2', 'string', 'col2', 'float'
 ]
  • 其中一个转换是一个自定义函数,用于检查给定类型(整数、字符串、浮点...)的每个单元格值。当对此转换后的数据帧调用操作时,就会出现问题
def func1(row):
        result = dict()
        for col, val in row.asDict().items():
            if (isnan(val)) or (isinstance(val, schema_dict.get(col))):
                res['Success'] = np.nan
            else:
                res['Failure'] = f'cell: {val} failed during validation'
        updated_row = Row(**row.asDict(), dataqualityresult=result)
        return updated_row

spark_df = transformed_df.toDF()
validated_df = spark_df.rdd.map(func1)
new_df = validated_df.toDF()
  • 然后我将 dataqualityresult 分解为 2 个不同的列 eval_status 和 eval_result
exploded_df = new_df.select(
        col1, 
        col2, 
        explode('dataqualityresult').alias('eval_status', 'eval_result')
)
exploded_df.count()
  • 添加操作(计数、显示、收集)时,func1 再次被触发并导致 isinstance 方法出错。我尝试了cache()和persist(),但没有成果。此外,还有其他基于成功/失败的转换(过滤器)
success_df = exploded_df.filter(new_df.eval_status == 'Success')
success_df.count()
  • 我仍然想知道为什么在应用缓存/持久后它会触发该功能? 有人可以帮我吗?

  • 仅供参考,尝试使用UDF以相同的结果结束

带有附加列的转换后的数据框,并根据 eval_status 获取计数以进行进一步转换。

apache-spark pyspark aws-glue
1个回答
0
投票

我能够采取解决方法。我使用 rdd.map(lambda x: function(x)) 遍历数据帧中的每一行并执行一些计算,然后使用过滤器来获取所需的数据,然后对过滤后的数据应用计数以进行进一步处理。 这解决了我的用例。

© www.soinside.com 2019 - 2024. All rights reserved.