我有一个小表(2k)记录和大表(500万)记录。我需要从小表中获取所有数据,并且只从大表中匹配数据,因此为了实现这一点,我执行了以下查询
select /*+ broadcast(small)*/ small.* From small left outer join large
虽然查询返回正确的结果,但当我检查查询计划时,它显示排序合并的广播哈希连接。
如果小桌是左桌不能播的话有什么限制吗?那出路又是什么?
由于您希望从小表而不是大表中选择整个数据集,因此 Spark 不会强制执行广播连接。但是,如果您更改连接顺序或转换为等连接,Spark 将很乐意启用广播连接。
例如:
原因: 原因是Spark将小表(也称为广播表)共享给大表数据所在的所有数据节点。在您的情况下,您需要小表中的所有数据,但只需要大表中的匹配数据。 Spark无法确定特定记录是否在另一个数据节点匹配或根本不匹配,因此如果小表是分布式的,则从小表中选择所有记录时会存在歧义。因此,Spark 在这种情况下不会使用广播连接。
通过广播左表来更改
order of the tables
,就像您正在做左连接一样,因此要广播右表(或)将连接类型更改为 right
。
select /*+ broadcast(small)*/ small.* From small right outer join large
select /*+ broadcast(small)*/ small.* From large left outer join small
Example:
df=spark.createDataFrame([(1,'a')],['id','name'])
df1=spark.createDataFrame([(1,'a')],['id','name'])
#broadcasting on right df1 and performing left join
df.join(broadcast(df1),['id'],'left').explain()
#== Physical Plan ==
#*(2) Project [id#0L, name#1, name#5]
#+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
# :- Scan ExistingRDD[id#0L,name#1]
# +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
# +- *(1) Filter isnotnull(id#4L)
# +- Scan ExistingRDD[id#4L,name#5]
#broadcasting df1 and right join defaults to Sortmerge join
df.join(broadcast(df1),['id'],'right').explain()
#== Physical Plan ==
#*(4) Project [id#4L, name#1, name#5]
#+- SortMergeJoin [id#0L], [id#4L], RightOuter
# :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#0L, 200)
# : +- *(1) Filter isnotnull(id#0L)
# : +- Scan ExistingRDD[id#0L,name#1]
# +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#4L, 200)
# +- Scan ExistingRDD[id#4L,name#5]
回复 - “再次感谢,但这也是不可能的,因为我的大尺寸超过 10GB,我认为 Spark 不支持广播。如果有任何其他出路,请告诉我。 – S甘古利”
我的理解---->
将要广播的数据带入驱动节点。如果您尝试在驱动程序节点上传输 10GB 的数据,那么驱动程序将因内存不足错误而失败。