我创建了一个小函数,它将为每一行分配一个复合 id,以便在给定子集大小的情况下将行本质上分组为更小的子集。在我的本地计算机上,逻辑运行完美。当我在 AWS EMR 上使用 PySpark 部署并测试 Spark 应用程序时,结果完全不同。
子集逻辑:
partition_column = "partition"
partitioned_df = dataframe.withColumn(
partition_column, floor(monotonically_increasing_id() / subset_length)
)
partitioned_df_ids = (
partitioned_df.select(partition_column)
.distinct()
.rdd.flatMap(lambda x: x)
.collect()
)
for partition_id in partitioned_df_ids:
temp_df = partitioned_df.filter(col(partition_column) == partition_id)
dataframe_refs.append(temp_df)
给定这个函数和一个包含 77,700 行的数据帧,如果我将子集长度设置为 50,000,我将得到 2 个较小的数据帧。一个有 50,000 行,另一个有 27,700 行。 然而,当我针对 AWS EMR PySpark 进行测试时,我发现有更小的子集 ~26 个,每个子集不超过 3200 个。
monotonically_increasing_id
为每个分区返回良好的 id 序列。因此,由于您只有一个分区(本地),因此您将获得一个适合您的逻辑工作的良好序列。
但是,当您有多个分区时,序列之间会有“间隙”。考虑这个例子:
from pyspark.sql import functions as F
df = spark.range(10) \
.withColumn('mono_id', F.monotonically_increasing_id()) \
.withColumn('partition_id', F.spark_partition_id()) \
df.show()
# +---+----------+------------+
# | id| mono_id|partition_id|
# +---+----------+------------+
# | 0| 0| 0|
# | 1| 1| 0|
# | 2| 2| 0|
# | 3| 3| 0|
# | 4| 4| 0|
# | 5|8589934592| 1|
# | 6|8589934593| 1|
# | 7|8589934594| 1|
# | 8|8589934595| 1|
# | 9|8589934596| 1|
# +---+----------+------------+
spark_partition_id
结果显示df
分布在2个分区之间。您可以看到 monotonically_increasing_id
在 INSIDE 分区内返回连续编号,但两个分区之间存在巨大差距。所以,因为这个“差距”,你的逻辑就行不通了。
这是一个具有三个分区的示例:
from pyspark.sql import functions as F
df = spark.range(10) \
.repartition(3) \
.withColumn('mono_id', F.monotonically_increasing_id()) \
.withColumn('partition_id', F.spark_partition_id())
df.show()
# +---+-----------+------------+
# | id| mono_id|partition_id|
# +---+-----------+------------+
# | 3| 0| 0|
# | 4| 1| 0|
# | 6| 2| 0|
# | 0| 8589934592| 1|
# | 1| 8589934593| 1|
# | 7| 8589934594| 1|
# | 8| 8589934595| 1|
# | 2|17179869184| 2|
# | 5|17179869185| 2|
# | 9|17179869186| 2|
# +---+-----------+------------+
如果将所有数据放入一个节点,您的代码就可以工作,但在分布式计算环境(即 Spark)中工作时,这并没有真正意义。
我认为您应该学习并应用重新分区以及可能的分桶逻辑,以便正确地将数据拆分为分区(如果您确实需要它,则取决于您的实际目的)。