我有两个数据集存储为镶木地板文件,其架构如下:
数据集1:
id | col1 | col2 |
---|---|---|
1 | v1 | v3 |
2 | v2 | v4 |
数据集2:
id | col3 | col4 |
---|---|---|
1 | v5 | v7 |
2 | v6 | v8 |
我想在 id 字段上使用 pyspark 连接两个数据集。这两个数据集都很大,有数百万条记录。这两个数据集对于任何
id
值都有一个条目。
我想以这样的方式存储数据集:当运行连接两个数据集的 pyspark 作业时,可以最小化洗牌。因此,在保留数据集 1 和数据集 2 之前,它们的分区如下:
dataset1.repartition(100, 'id')
dataset2.repartition(100, 'id')
id
值的数量很大,所以我不能repartitionBy
id列,因为这会导致大量小文件。
根据我的理解,重新分区将确保相同的 id 进入 dataset1 和 dataset2 的相同分区号。如果我读取连接作业中的数据,如果数据集 1 的分区 1 和数据集 2 的分区 1 包含相同的 id,则可以轻松地将它们连接到内存中。但这并没有发生,我可以看到连接作业中发生了很多洗牌。
spark 是否从 parquet 文件推断出此分区信息?有没有办法让 Spark 推断出这些信息?有没有更好的方法将相同的id放在相同的分区号中,以便可以优化诸如连接之类的操作?
Spark 可以从 parquet 文件推断分区信息,但它可能不是特定作业的最佳分区。在您的情况下,由于您已经根据“id”列重新分区了两个数据集,Spark 应该能够从 parquet 文件中读取分区信息。
但是,Spark 在连接操作期间使用的默认分区可能与您之前指定的分区不同。您可以使用
repartition
函数或 coalesce
函数显式设置连接操作的分区数。
例如,如果您想使用 100 个分区和“id”列连接两个数据集,您可以这样做:
joined_dataset = dataset1.join(dataset2.repartition(100, 'id'), 'id').repartition(100)
此代码确保两个数据集在连接操作之前以相同的方式重新分区。最后的
repartition(100)
确保最终结果也根据 'id' 列划分为 100 个分区。
或者,您也可以使用
coalesce
来减少连接操作后的分区数量:
joined_dataset = dataset1.join(dataset2.repartition(100, 'id'), 'id').coalesce(100)
此代码还确保两个数据集在连接操作之前以相同的方式重新分区。最后的
coalesce(100)
在连接操作后将分区数量减少到100。