SparkSQL查询计划中的HashAggregate。

问题描述 投票:0回答:1

我只是想了解SparkSQL(2.4)中生成的查询计划。我有以下查询和它对应的查询计划(如下)。该查询只是一个测试查询)。

create temporary view tab_in as
select distinct 
       mth_id 
from tgt_tbl;

select /*+ BROADCAST(c) */
a.mth_id,
a.qtr_id,
a.prod_id,
a.sale_date
from my_table a
left anti join tab_in c
on a.mth_id = c.mth_id;

解释一下计划。

+- *(3) Project [mth_id#652, qtr_id#653, prod_id#655, sale_dt#656]
   +- *(3) BroadcastHashJoin [mth_id#652], [mth_id#867], LeftAnti, BuildRight
      :- *(3) Project [mth_id#652, qtr_id#653, sale_dt#656, prod_id#655]
      :  +- *(3) Filescan parquet test_db.my_table[mth_id#652, qtr_id#653, prod_id#655, sale_dt#656] Batched: true, Format: Parquet, Location: CatalogFileIndex[s3://test-data/my_table/0], PartitionCount: 1, PartitionFilters: [], PushedFilters: [], ReadSchema ......
      +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
         +- *(2) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867]
            +- Exchange hashpartitioning(mth_id#867, 200)
               +- *(1) HashAggregate(keys=[mth_id#867], functions=[], output=[mth_id#867])
                  +- *(1) Project [mth_id#867]
                     +- *(1) Filter isnotnull(mth_id#867)
                        +- *(1) FileScan parquet test_db.my_table[mth_id#867] Batched:true, Format:  Parquet, Location: InMemoryFileIndex[s3://test-data/tgt_tbl/20200609], PartitionFilters: [], PushedFilters: [IsNotNull(mth_id)], ReadSchema struct<mth_id:int>

从上面可以看出,有2个... HashAggregates 在计划中执行 - 1 前后 1 之后 Exchange HashPartitioning. 我想,第一个 HashAggregate 的存在,可能是由于 DISTINCT 子句,但我似乎无法理解第二个查询的原因。HashPartitioning (交换后)。

我试着把两个查询合并成一个查询,把第一个查询放在一个 WITH CTE 子句,但还是得到了同样的结果。

谁能解释一下第二条(从下往上读)的必要性?HashAggregate.

任何帮助是感激。谅谅

apache-spark-sql sql-execution-plan
1个回答
1
投票

HashAggregates 在计划中是由于重复数据删除(distinct). 该 HashAggregate 通常是一对。这里第一个负责每个执行者的本地重复数据删除。之后是 Exchange - 的数据必须进行洗牌,并在第二个 HashAggregate 负责洗牌后最后的重复数据删除。

© www.soinside.com 2019 - 2024. All rights reserved.