因此,首先,我有一个包含 2 列的数据框,
key
和 value
均为 String
类型
键 | 价值 |
---|---|
'钥匙1' | '值1' |
'键2' | '值2' |
等等等等。
数据帧是通过读取已知 s3 位置的 parquet 文件创建的。
my_df = spark.read.parquet('<some-s3-path>')
我首先尝试迭代数据帧的所有行,使用
查看哪些行的大小> 400Kb(或 400000 字节)rows = df.collect()
for rw in rows:
r = ''.join(rw[0:])
curr_size = sys.getsizeof(r)
if 400000 < curr_size:
print(curr_size)
现在,我想要实现的是如何从数据框中“过滤”出此类行,以仅包含大小为 < 400 Kb?
的行我当然可以从上面的函数创建一个
udf
,但我不确定如何在过滤器或where条件中使用它
意味着以下所有内容均已尝试
my_df.filter(get_size_udf(my_df) < 400000)
或
my_df.where(get_size_udf(my_df) < 40000)
或
filtered_df = my_df.withColumn("sizeOf", get_size(small_df))
但是,我缺乏知识限制了我找出其他解决方案。 因此,需要指点!
为了实现根据大小从 DataFrame 中过滤行的目标,您确实可以创建一个用户定义函数 (UDF) 并将其与
filter
函数一起使用。具体方法如下:
from pyspark.sql.functions import udf
import sys
def get_row_size(row):
r = ''.join(row)
return sys.getsizeof(r)
get_row_size_udf = udf(get_row_size)
filter
函数过滤掉大小大于 400 Kb 的行:filtered_df = my_df.filter(get_row_size_udf(my_df['key'], my_df['value']) < 400000)
这将创建一个新的 DataFrame
filtered_df
,其中仅包含连接的“键”和“值”列的大小小于 400 Kb 的行。
确保根据实际的 DataFrame 架构调整 UDF 中的列名称和过滤器函数。另外,请确保 UDF 中的大小计算逻辑准确符合您的要求。