我有一个名为
fillnulls
的 PySpark 函数,它通过根据列类型填充适当的值来处理数据集中的空值。然而,我注意到该函数的性能并不是最佳的,特别是在处理大型数据集时。我相信可能会有更好的方法或优化来提高效率。
以下是
fillnulls
功能的简要概述:
def fillnulls(df):
for colname in df.columns:
dtype = df.schema[colname].dataType
colname_new = colname + "_filled"
if isinstance(dtype, T.StringType):
try:
# count values in column
df = df.withColumn("count", F.count(colname).over(Window.partitionBy(colname)))
# get most frequent value
df = df.withColumn("mode",
F.array_max(F.collect_list(F.col("count")).over(Window.partitionBy(F.lit(1)))))
# fill values with according to the filling logic
df = df.withColumn(colname_new, F.when(F.col(colname).isNull(), F.col("mode")))
df = df.withColumn(colname_new,
F.when(F.col("count") < 100, F.lit("other")).otherwise(F.col(colname)))
# drop columns
df = df.drop(colname, "count", "mode")
df = df.withColumnRenamed(colname_new, colname)
except Py4JJavaError:
# if there're no values at all in the col
df = df.fillna({colname: 'null'})
else:
try:
# fill nulls with mean
df = df.withColumn("mean", F.mean(colname).over(Window.partitionBy(F.lit(1))))
df = df.withColumn(colname_new,
F.when(F.col(colname).isNull(), F.col("mean")).otherwise(F.col(colname)))
# drop columns
df = df.drop(colname, "mean")
df = df.withColumnRenamed(colname_new, colname)
except Py4JJavaError:
# if there're no values at all in the col
df = df.fillna({colname: -1})
return df
该函数处理字符串列的方式与处理数字列的方式不同。对于字符串列,它计算最常见的值并相应地填充空值。它还计算出现次数少于 100 次的值并用“其他”填充它们。对于数字列,它用平均值填充空值。
但是,这个函数在大型数据集上似乎表现不佳。处理需要很长时间,我怀疑可能有更有效的方法来达到相同的结果。
您能否检查一下该功能并提出任何改进或优化建议,以使其更快、更高效?
此外,如果您有任何在 PySpark 中处理空值的最佳实践或替代方法,我将很高兴了解它们。
您的很多计算都可以通过
df.describe()
来处理。考虑到这一点,我们可以为 df.fillna
构建一个地图并返回:
def get_na_map(df):
filler = {}
# will calculate the mean, mode, max, min, and count
# for you, so start here
stats = df.describe()
# iterate over the dtypes rather than a lookup
# on schema every time
for c, type_ in df.dtypes:
# this happens to me on boolean columns sometimes
if c not in stats.columns:
print(f'skipping {c} because no stats were calculated')
continue
# grab each columns stats that you want
# I needed the select because the agg didn't
# work the way I wanted
col_stats = (
cstats
.groupBy('summary', c)
.pivot('summary')
.agg(F.max(c))
.select(*(F.max(col).alias(f'col_{col}') for col in ['count', 'max', 'mean']))
.head(1)
)[0]
# count should always be an int
count = int(col_stats.col_count)
mode = col_stats.col_max
mean = col_stats.col_mean
# handle the no records issue here
# for string
if type_ == 'string' and not count:
filler[c] = 'null'
elif type_ == 'string' and count < 100:
filler[c] = 'other'
elif type_ == 'string' and count >= 100:
filler[c] = mode
elif count:
filler[c] = float(mean) if mean is not None else mean
else:
# no records case also handled here
filler[c] = -1
return filler
na_vals = get_na_map(df)
# see what your fill_values are
print(na_vals)
# apply the map here
df.fillna(na_vals).show()