Spark 在读取时是否使用 repartition() 来推断 parquet 文件的分区是否持续存在?

问题描述 投票:0回答:1

我有两个数据集存储为镶木地板文件,其架构如下:

数据集1:

id col1 col2
1 v1 v3
2 v2 v4

数据集2:

id col3 col4
1 v5 v7
2 v6 v8

我想在 id 字段上使用 pyspark 连接两个数据集。这两个数据集都很大,有数百万条记录。这两个数据集对于任何

id
值都有一个条目。

我想以这样的方式存储数据集:当运行连接两个数据集的 pyspark 作业时,可以最小化洗牌。因此,在保留数据集 1 和数据集 2 之前,它们的分区如下:

dataset1.repartition(100, 'id')
dataset2.repartition(100, 'id')

id
值的数量很大,所以我不能
repartitionBy
id列,因为这会导致大量小文件。

根据我的理解,重新分区将确保相同的 id 进入 dataset1 和 dataset2 的相同分区号。如果我读取连接作业中的数据,如果数据集 1 的分区 1 和数据集 2 的分区 1 包含相同的 id,则可以轻松地将它们连接到内存中。但这并没有发生,我可以看到连接作业中发生了很多洗牌。

spark 是否从 parquet 文件推断出此分区信息?有没有办法让 Spark 推断出这些信息?有没有更好的方法将相同的id放在相同的分区号中,以便可以优化诸如连接之类的操作?

apache-spark pyspark parquet partitioning
1个回答
0
投票

Spark 可以从 parquet 文件推断分区信息,但它可能不是特定作业的最佳分区。在您的情况下,由于您已经根据“id”列重新分区了两个数据集,Spark 应该能够从 parquet 文件中读取分区信息。

但是,Spark 在连接操作期间使用的默认分区可能与您之前指定的分区不同。您可以使用

repartition
函数或
coalesce
函数显式设置连接操作的分区数。

例如,如果您想使用 100 个分区和“id”列连接两个数据集,您可以这样做:

joined_dataset = dataset1.join(dataset2.repartition(100, 'id'), 'id').repartition(100)

此代码确保两个数据集在连接操作之前以相同的方式重新分区。最后的

repartition(100)
确保最终结果也根据 'id' 列划分为 100 个分区。

或者,您也可以使用

coalesce
来减少连接操作后的分区数量:

joined_dataset = dataset1.join(dataset2.repartition(100, 'id'), 'id').coalesce(100)

此代码还确保两个数据集在连接操作之前以相同的方式重新分区。最后的

coalesce(100)
在连接操作后将分区数量减少到100。

© www.soinside.com 2019 - 2024. All rights reserved.