PySpark monotonically_increasing_id 结果在本地和 AWS EMR 上不同

问题描述 投票:0回答:1

我创建了一个小函数,它将为每一行分配一个复合 id,以便在给定子集大小的情况下将行本质上分组为更小的子集。在我的本地计算机上,逻辑运行完美。当我在 AWS EMR 上使用 PySpark 部署并测试 Spark 应用程序时,结果完全不同。

子集逻辑:

partition_column = "partition"
partitioned_df = dataframe.withColumn(
    partition_column, floor(monotonically_increasing_id() / subset_length)
)
partitioned_df_ids = (
    partitioned_df.select(partition_column)
    .distinct()
    .rdd.flatMap(lambda x: x)
    .collect()
)
for partition_id in partitioned_df_ids:
    temp_df = partitioned_df.filter(col(partition_column) == partition_id)
    dataframe_refs.append(temp_df)

给定这个函数和一个包含 77,700 行的数据帧,如果我将子集长度设置为 50,000,我将得到 2 个较小的数据帧。一个有 50,000 行,另一个有 27,700 行。 然而,当我针对 AWS EMR PySpark 进行测试时,我发现有更小的子集 ~26 个,每个子集不超过 3200 个。

python apache-spark pyspark amazon-emr distributed-computing
1个回答
0
投票

在本地执行时,您的数据可能只有一个分区

monotonically_increasing_id
为每个分区返回良好的 id 序列。因此,由于您只有一个分区(本地),因此您将获得一个适合您的逻辑工作的良好序列。

但是,当您有多个分区时,序列之间会有“间隙”。考虑这个例子:

from pyspark.sql import functions as F
df = spark.range(10) \
          .withColumn('mono_id', F.monotonically_increasing_id()) \
          .withColumn('partition_id', F.spark_partition_id()) \

df.show()
# +---+----------+------------+
# | id|   mono_id|partition_id|
# +---+----------+------------+
# |  0|         0|           0|
# |  1|         1|           0|
# |  2|         2|           0|
# |  3|         3|           0|
# |  4|         4|           0|
# |  5|8589934592|           1|
# |  6|8589934593|           1|
# |  7|8589934594|           1|
# |  8|8589934595|           1|
# |  9|8589934596|           1|
# +---+----------+------------+

spark_partition_id
结果显示
df
分布在2个分区之间。您可以看到
monotonically_increasing_id
在 INSIDE 分区内返回连续编号,但两个分区之间存在巨大差距。所以,因为这个“差距”,你的逻辑就行不通了。

这是一个具有三个分区的示例:

from pyspark.sql import functions as F
df = spark.range(10) \
          .repartition(3) \
          .withColumn('mono_id', F.monotonically_increasing_id()) \
          .withColumn('partition_id', F.spark_partition_id())

df.show()
# +---+-----------+------------+
# | id|    mono_id|partition_id|
# +---+-----------+------------+
# |  3|          0|           0|
# |  4|          1|           0|
# |  6|          2|           0|
# |  0| 8589934592|           1|
# |  1| 8589934593|           1|
# |  7| 8589934594|           1|
# |  8| 8589934595|           1|
# |  2|17179869184|           2|
# |  5|17179869185|           2|
# |  9|17179869186|           2|
# +---+-----------+------------+

如果将所有数据放入一个节点,您的代码就可以工作,但在分布式计算环境(即 Spark)中工作时,这并没有真正意义。

我认为您应该学习并应用重新分区以及可能的分桶逻辑,以便正确地将数据拆分为分区(如果您确实需要它,则取决于您的实际目的)。

© www.soinside.com 2019 - 2024. All rights reserved.