在spark数据集中连接两个聚类表,似乎最终会出现完全洗牌的情况。

问题描述 投票:1回答:1

我有两个蜂巢集群表t1和t2。

CREATE EXTERNAL TABLE `t1`(
  `t1_req_id` string,
   ...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS
// t2 looks similar with same amount of buckets

插入部分发生在蜂巢中

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
insert overwrite table `t1` partition(t1_stats_date,t1_stats_hour)
   select *
   from t1_raw
   where t1_stats_date='2020-05-10' and t1_stats_hour='12' AND 
   t1_req_id is not null

代码看起来像下面。

 val t1 = spark.table("t1").as[T1]
 val t2=  spark.table("t2").as[T2]
 val outDS = t1.joinWith(t2, t1("t1_req_id) === t2("t2_req_id), "fullouter")
  .map { case (t1Obj, t2Obj) =>
    val t3:T3 = // do some logic
    t3 
  }
 outDS.toDF.write....

我在DAG中看到了投影 但似乎这项工作仍然是完整的数据洗牌 还有,在查看执行器的日志时 我没有看到它在一个块中读取两个表的同一个桶 - 这是我期望找到的

spark.sql.sources.bucketing.enabled, spark.sessionState.conf.bucketingEnabledspark.sql.join.preferSortMergeJoin 旗帜

我缺少了什么,为什么还有全盘洗牌,如果有桶状表呢?

enter image description hereenter image description here

scala apache-spark apache-spark-dataset
1个回答
0
投票

这里需要检查的一种可能性是你是否有类型不匹配。例如,如果连接列的类型在T1中是string,在T2中是BIGINT。即使类型都是整数(比如一个是int,另一个是bigint),Spark仍然会在这里添加shuffle,因为不同的类型使用不同的哈希函数来进行bucketing。

© www.soinside.com 2019 - 2024. All rights reserved.