使用PySpark 2.4.7
我的目标是将 PySpark DataFrame 写入 AWS S3 中特定数量的 parquet 文件。
假设我想将 PySpark DataFrame 写入 10 个 parquet 文件。 这就是我的做法
df.repartition(10).write.mode("append").partitionBy(partition_cols).saveAsTable(<db.table>)
这将为 S3 中的每个分区存储桶写入 10 个 parquet 文件。我想在所有分区列中总共写入 10 个(大约),我怎样才能实现这一点?
可以按照答案中提到的
Abdennacer Lachiheb
来实现。然而,根据我的评论,分区列内存在不平衡,因此简单地将文件总数除以分区列数并不是最佳答案。在这种情况下,我最终会得到 5 个文件,每个文件 10mb 用于 train
分区,5 个文件,每个文件 1mb 用于 valid
分区。我希望它具有相似的文件大小。此外,我可以通过使用分层来实现这一点,但想知道是否有更简单的方法来实现它。
我们知道文件数是所有分区上所有文件的总和:
nb_all_files = nb_files_per_partitions * len(partition_cols)
那么 nb_files_per_partitions = nb_all_files / len(partition_cols)
所以在你的情况下:
nb_files_per_partitions = 10 / len(partition_cols)
最终结果:
df.repartition(10/len(partition_cols)).write.mode("append").partitionBy(partition_cols).saveAsTable(<db.table>)
在写出数据框时可以使用
coalesce
:
df.coalesce(partition_count).write.parquet(<storage account path>)
据我从问题和评论中了解到,您的数据分区如下
/date=yyyyMMdd/
|-/train/
|-/validation/
对于训练和验证,您需要基于数据大小的文件编号。假设您有 1L 条记录。其中 70k 是训练,30k 是验证。在这种情况下,如果您设置
maxRecordsPerFile=10000
,您将有 7 个用于训练的文件和 3 个用于验证的文件。