pyspark monotonically_increasing_id 奇怪的行为

问题描述 投票:0回答:1

我无法确定确切的原因,所以希望其他人知道。我创建了一个小函数,它会为每一行分配一个复合 id,以便在给定子集大小的情况下将行本质上分组为更小的子集。在我的本地计算机上,逻辑运行完美。当我在 AWS EMR 上使用 pyspark 部署并测试 Spark 应用程序时,结果完全不同。

子集逻辑:

partition_column = "partition"
partitioned_df = dataframe.withColumn(
    partition_column, floor(monotonically_increasing_id() / subset_length)
)
partitioned_df_ids = (
    partitioned_df.select(partition_column)
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)
for partition_id in partitioned_df_ids:
    temp_df = partitioned_df.filter(col(partition_column) == partition_id)
    dataframe_refs.append(temp_df)

给定这个函数和一个包含 77,700 行的数据帧,如果我将子集长度设置为 50,000,我将得到 2 个较小的数据帧。 1 个有 50,000 行,1 个有 27,700 行。 然而,当我针对 AWS emr pyspark 进行测试时,我看到了更小的子集 ~26 个,每个子集不超过 3200 个。

无法弄清楚为什么会发生这种情况。在本地,它按预期运行。

python python-3.x pyspark amazon-emr
1个回答
0
投票

在本地执行时,您的数据可能只有一个分区。

monotonically_increasing_id
为每个分区返回良好的 id 序列。因此,由于您只有一个分区(本地),因此您将获得一个适合您的逻辑工作的良好序列。

但是,当您有多个分区时,序列之间会有“间隙”。考虑这个例子:

from pyspark.sql import functions as F
df = spark.range(10) \
          .withColumn('mono_id', F.monotonically_increasing_id()) \
          .withColumn('partition_id', F.spark_partition_id()) \

df.show()
# +---+----------+------------+
# | id|   mono_id|partition_id|
# +---+----------+------------+
# |  0|         0|           0|
# |  1|         1|           0|
# |  2|         2|           0|
# |  3|         3|           0|
# |  4|         4|           0|
# |  5|8589934592|           1|
# |  6|8589934593|           1|
# |  7|8589934594|           1|
# |  8|8589934595|           1|
# |  9|8589934596|           1|
# +---+----------+------------+

spark_partition_id
结果显示
df
分布在2个分区之间。您可以看到
monotonically_increasing_id
在 INSIDE 分区内返回连续编号,但两个分区之间存在巨大差距。所以,因为这个“差距”,你的逻辑就行不通了。

© www.soinside.com 2019 - 2024. All rights reserved.