我无法确定确切的原因,所以希望其他人知道。我创建了一个小函数,它会为每一行分配一个复合 id,以便在给定子集大小的情况下将行本质上分组为更小的子集。在我的本地计算机上,逻辑运行完美。当我在 AWS EMR 上使用 pyspark 部署并测试 Spark 应用程序时,结果完全不同。
子集逻辑:
partition_column = "partition"
partitioned_df = dataframe.withColumn(
partition_column, floor(monotonically_increasing_id() / subset_length)
)
partitioned_df_ids = (
partitioned_df.select(partition_column)
.distinct()
.rdd.flatMap(lambda x: x)
.collect()
)
for partition_id in partitioned_df_ids:
temp_df = partitioned_df.filter(col(partition_column) == partition_id)
dataframe_refs.append(temp_df)
给定这个函数和一个包含 77,700 行的数据帧,如果我将子集长度设置为 50,000,我将得到 2 个较小的数据帧。 1 个有 50,000 行,1 个有 27,700 行。 然而,当我针对 AWS emr pyspark 进行测试时,我看到了更小的子集 ~26 个,每个子集不超过 3200 个。
无法弄清楚为什么会发生这种情况。在本地,它按预期运行。
在本地执行时,您的数据可能只有一个分区。
monotonically_increasing_id
为每个分区返回良好的 id 序列。因此,由于您只有一个分区(本地),因此您将获得一个适合您的逻辑工作的良好序列。
但是,当您有多个分区时,序列之间会有“间隙”。考虑这个例子:
from pyspark.sql import functions as F
df = spark.range(10) \
.withColumn('mono_id', F.monotonically_increasing_id()) \
.withColumn('partition_id', F.spark_partition_id()) \
df.show()
# +---+----------+------------+
# | id| mono_id|partition_id|
# +---+----------+------------+
# | 0| 0| 0|
# | 1| 1| 0|
# | 2| 2| 0|
# | 3| 3| 0|
# | 4| 4| 0|
# | 5|8589934592| 1|
# | 6|8589934593| 1|
# | 7|8589934594| 1|
# | 8|8589934595| 1|
# | 9|8589934596| 1|
# +---+----------+------------+
spark_partition_id
结果显示df
分布在2个分区之间。您可以看到 monotonically_increasing_id
在 INSIDE 分区内返回连续编号,但两个分区之间存在巨大差距。所以,因为这个“差距”,你的逻辑就行不通了。