PySpark 函数处理性能较差的空值 - 需要优化建议

问题描述 投票:0回答:1

我有一个名为

fillnulls
的 PySpark 函数,它通过根据列类型填充适当的值来处理数据集中的空值。然而,我注意到该函数的性能并不是最佳的,特别是在处理大型数据集时。我相信可能会有更好的方法或优化来提高效率。

以下是

fillnulls
功能的简要概述:

def fillnulls(df):
    for colname in df.columns:
        dtype = df.schema[colname].dataType
        colname_new = colname + "_filled"
        if isinstance(dtype, T.StringType):
            try:
                # count values in column
                df = df.withColumn("count", F.count(colname).over(Window.partitionBy(colname)))

                # get most frequent value
                df = df.withColumn("mode",
                                   F.array_max(F.collect_list(F.col("count")).over(Window.partitionBy(F.lit(1)))))

                # fill values with according to the filling logic
                df = df.withColumn(colname_new, F.when(F.col(colname).isNull(), F.col("mode")))
                df = df.withColumn(colname_new,
                                   F.when(F.col("count") < 100, F.lit("other")).otherwise(F.col(colname)))

                # drop columns
                df = df.drop(colname, "count", "mode")
                df = df.withColumnRenamed(colname_new, colname)

            except Py4JJavaError:
                # if there're no values at all in the col
                df = df.fillna({colname: 'null'})
        else:
            try:
                # fill nulls with mean
                df = df.withColumn("mean", F.mean(colname).over(Window.partitionBy(F.lit(1))))
                df = df.withColumn(colname_new,
                                   F.when(F.col(colname).isNull(), F.col("mean")).otherwise(F.col(colname)))

                # drop columns
                df = df.drop(colname, "mean")
                df = df.withColumnRenamed(colname_new, colname)
            except Py4JJavaError:
                # if there're no values at all in the col
                df = df.fillna({colname: -1})
    return df

该函数处理字符串列的方式与处理数字列的方式不同。对于字符串列,它计算最常见的值并相应地填充空值。它还计算出现次数少于 100 次的值并用“其他”填充它们。对于数字列,它用平均值填充空值。

但是,这个函数在大型数据集上似乎表现不佳。处理需要很长时间,我怀疑可能有更有效的方法来达到相同的结果。

您能否检查一下该功能并提出任何改进或优化建议,以使其更快、更高效?

此外,如果您有任何在 PySpark 中处理空值的最佳实践或替代方法,我将很高兴了解它们。

python apache-spark pyspark
1个回答
1
投票

您的很多计算都可以通过

df.describe()
来处理。考虑到这一点,我们可以为
df.fillna
构建一个地图并返回:

def get_na_map(df):
    filler = {}

    # will calculate the mean, mode, max, min, and count
    # for you, so start here
    stats = df.describe()

    # iterate over the dtypes rather than a lookup
    # on schema every time
    for c, type_ in df.dtypes:
        # this happens to me on boolean columns sometimes
        if c not in stats.columns:
            print(f'skipping {c} because no stats were calculated')
            continue

        # grab each columns stats that you want
        # I needed the select because the agg didn't
        # work the way I wanted
        col_stats = (
            cstats
            .groupBy('summary', c)
            .pivot('summary')
            .agg(F.max(c))
            .select(*(F.max(col).alias(f'col_{col}') for col in ['count', 'max', 'mean']))
            .head(1)
        )[0]

        # count should always be an int
        count = int(col_stats.col_count)
        mode = col_stats.col_max
        mean = col_stats.col_mean
        
        # handle the no records issue here
        # for string
        if type_ == 'string' and not count:
            filler[c] = 'null'
        elif type_ == 'string' and count < 100:
            filler[c] = 'other'
        elif type_ == 'string' and count >= 100:
            filler[c] = mode
        elif count:
            filler[c] = float(mean) if mean is not None else mean
        else:
            # no records case also handled here
            filler[c] = -1

    return filler


na_vals = get_na_map(df)

# see what your fill_values are
print(na_vals)

# apply the map here
df.fillna(na_vals).show()
© www.soinside.com 2019 - 2024. All rights reserved.