将 pyspark 数据帧写入所有分区列中总共特定数量的 parquet 文件中

问题描述 投票:0回答:3

使用PySpark 2.4.7

我的目标是将 PySpark DataFrame 写入 AWS S3 中特定数量的 parquet 文件。

假设我想将 PySpark DataFrame 写入 10 个 parquet 文件。 这就是我的做法

df.repartition(10).write.mode("append").partitionBy(partition_cols).saveAsTable(<db.table>)

这将为 S3 中的每个分区存储桶写入 10 个 parquet 文件。我想在所有分区列中总共写入 10 个(大约),我怎样才能实现这一点?

可以按照答案中提到的

Abdennacer Lachiheb
来实现。然而,根据我的评论,分区列内存在不平衡,因此简单地将文件总数除以分区列数并不是最佳答案。在这种情况下,我最终会得到 5 个文件,每个文件 10mb 用于
train
分区,5 个文件,每个文件 1mb 用于
valid
分区。我希望它具有相似的文件大小。此外,我可以通过使用分层来实现这一点,但想知道是否有更简单的方法来实现它。

amazon-web-services apache-spark pyspark apache-spark-sql parquet
3个回答
1
投票

我们知道文件数是所有分区上所有文件的总和:

nb_all_files = nb_files_per_partitions * len(partition_cols)

那么 nb_files_per_partitions = nb_all_files / len(partition_cols)

所以在你的情况下:

nb_files_per_partitions = 10 / len(partition_cols)

最终结果:

df.repartition(10/len(partition_cols)).write.mode("append").partitionBy(partition_cols).saveAsTable(<db.table>)

0
投票

在写出数据框时可以使用

coalesce

df.coalesce(partition_count).write.parquet(<storage account path>)

0
投票

据我从问题和评论中了解到,您的数据分区如下

/date=yyyyMMdd/
     |-/train/
     |-/validation/

对于训练和验证,您需要基于数据大小的文件编号。假设您有 1L 条记录。其中 70k 是训练,30k 是验证。在这种情况下,如果您设置

maxRecordsPerFile=10000
,您将有 7 个用于训练的文件和 3 个用于验证的文件。

© www.soinside.com 2019 - 2024. All rights reserved.