根据 pyspark 数据框中每行的大小创建一个新列

问题描述 投票:0回答:1

因此,首先,我有一个包含 2 列的数据框,

key
value
均为
String

类型
价值
'钥匙1' '值1'
'键2' '值2'

等等等等。

数据帧是通过读取已知 s3 位置的 parquet 文件创建的。

my_df = spark.read.parquet('<some-s3-path>')

我首先尝试迭代数据帧的所有行,使用

查看哪些行的大小> 400Kb(或 400000 字节)
rows = df.collect()
for rw in rows:
  r = ''.join(rw[0:])
  curr_size = sys.getsizeof(r)
  if 400000 < curr_size:
     print(curr_size)

现在,我想要实现的是如何从数据框中“过滤”出此类行,以仅包含大小为 < 400 Kb?

的行

我当然可以从上面的函数创建一个

udf
,但我不确定如何在过滤器或where条件中使用它 意味着以下所有内容均已尝试

my_df.filter(get_size_udf(my_df) < 400000)

my_df.where(get_size_udf(my_df) < 40000)

filtered_df = my_df.withColumn("sizeOf", get_size(small_df))

但是,我缺乏知识限制了我找出其他解决方案。 因此,需要指点!

python dataframe pyspark boto3
1个回答
0
投票

为了实现根据大小从 DataFrame 中过滤行的目标,您确实可以创建一个用户定义函数 (UDF) 并将其与

filter
函数一起使用。具体方法如下:

  1. 定义一个计算行大小的UDF:
from pyspark.sql.functions import udf
import sys

def get_row_size(row):
    r = ''.join(row)
    return sys.getsizeof(r)

get_row_size_udf = udf(get_row_size)
  1. 使用 UDF 和
    filter
    函数过滤掉大小大于 400 Kb 的行:
filtered_df = my_df.filter(get_row_size_udf(my_df['key'], my_df['value']) < 400000)

这将创建一个新的 DataFrame

filtered_df
,其中仅包含连接的“键”和“值”列的大小小于 400 Kb 的行。

确保根据实际的 DataFrame 架构调整 UDF 中的列名称和过滤器函数。另外,请确保 UDF 中的大小计算逻辑准确符合您的要求。

© www.soinside.com 2019 - 2024. All rights reserved.