排序合并连接策略尽管在 pyspark 中预排序数据仍然有排序步骤

问题描述 投票:0回答:1
data = [(1, "Alice", "A"),
        (3, "Charlie", "A"),
        (2, "Bob", "B"),
        (4, "David", "B")]
schema = ["id", "name", "partition_key"]

df = spark.createDataFrame(data, schema=schema)

df.repartition(2, f.col("id")).write.mode("overwrite")\
 .format("parquet") \
 .bucketBy(2, "id") \
 .sortBy("id") \
 .option("compression", "snappy") \
 .saveAsTable("bucketed_table_H")


spark.sql("""
select *
from bucketed_table_H a join bucketed_table_H b
on a.id = b.id
""").explain(True)

我看到计划了:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#24L], [id#27L], Inner
   :- Sort [id#24L ASC NULLS FIRST], false, 0
   :  +- Filter isnotnull(id#24L)
   :     +- FileScan parquet default.bucketed_table_h[id#24L,name#25,partition_key#26] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#24L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2
   +- Sort [id#27L ASC NULLS FIRST], false, 0
      +- Filter isnotnull(id#27L)
         +- FileScan parquet default.bucketed_table_h[id#27L,name#28,partition_key#29] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file.., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2 

我为什么这还有排序步骤?

我尝试以下帖子:当两个表的连接以相同的方式存储和排序时,为什么 Spark 会重新排序数据?

但我看到计划仍然有排序步骤

apache-spark sorting join pyspark mergesort
1个回答
0
投票

计划中没有

Exchange
。这就是重点。

排序用于合并排序。

又说它工作正常。

© www.soinside.com 2019 - 2024. All rights reserved.