data = [(1, "Alice", "A"),
(3, "Charlie", "A"),
(2, "Bob", "B"),
(4, "David", "B")]
schema = ["id", "name", "partition_key"]
df = spark.createDataFrame(data, schema=schema)
df.repartition(2, f.col("id")).write.mode("overwrite")\
.format("parquet") \
.bucketBy(2, "id") \
.sortBy("id") \
.option("compression", "snappy") \
.saveAsTable("bucketed_table_H")
spark.sql("""
select *
from bucketed_table_H a join bucketed_table_H b
on a.id = b.id
""").explain(True)
我看到计划了:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [id#24L], [id#27L], Inner
:- Sort [id#24L ASC NULLS FIRST], false, 0
: +- Filter isnotnull(id#24L)
: +- FileScan parquet default.bucketed_table_h[id#24L,name#25,partition_key#26] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#24L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2
+- Sort [id#27L ASC NULLS FIRST], false, 0
+- Filter isnotnull(id#27L)
+- FileScan parquet default.bucketed_table_h[id#27L,name#28,partition_key#29] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#27L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file.., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,name:string,partition_key:string>, SelectedBucketsCount: 2 out of 2
我为什么这还有排序步骤?
我尝试以下帖子:当两个表的连接以相同的方式存储和排序时,为什么 Spark 会重新排序数据?
但我看到计划仍然有排序步骤
计划中没有
Exchange
。这就是重点。
排序用于合并排序。
又说它工作正常。