Spark 中的倾斜数据集连接?

问题描述 投票:0回答:6

我正在使用 Spark RDD 连接两个大数据集。一个数据集非常倾斜,因此很少有执行器任务需要很长时间才能完成工作。我该如何解决这种情况?

join apache-spark
6个回答
29
投票

关于如何做到这一点的非常好的文章:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

简短版:

  • 向大型 RDD 添加随机元素并用它创建新的连接键
  • 使用explode/flatMap向小RDD添加随机元素以增加条目数量并创建新的连接键
  • 在新的连接键上连接 RDD,由于随机播种,现在可以更好地分配

19
投票

假设您必须在 A.id=B.id 上连接两个表 A 和 B。假设表 A 在 id=1 上有偏差。

从A中选择A.id并在A.id = B.id上加入B

有两种基本方法可以解决倾斜连接问题:

方法一:

将您的查询/数据集分为两部分 - 一部分仅包含倾斜数据,另一部分包含非倾斜数据。 在上面的例子中。查询将变成 -

 1. select A.id from A join B on A.id = B.id where A.id <> 1;
 2. select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;

第一个查询不会有任何偏差,因此 ResultStage 的所有任务将大致在同一时间完成。

如果我们假设 B 只有几行且 B.id = 1,那么它将适合内存。因此第二个查询将转换为广播连接。这在 Hive 中也称为 Map-side join。

参考:https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization

然后可以合并两个查询的部分结果以获得最终结果。

方法2:

LeMuBei 上面也提到,第二种方法尝试通过附加额外的列来随机化连接键。 步骤:

  1. 在较大的表 (A) 中添加一列,例如 skewLeft,并为所有行填充 0 到 N-1 之间的随机数。

  2. 在较小的表 (B) 中添加一列,例如 skewRight。将较小的表复制 N 次。因此,对于原始数据的每个副本,新 skewRight 列中的值将从 0 到 N-1 变化。为此,您可以使用explode sql/dataset 运算符。

在1和2之后,连接2个数据集/表,连接条件更新为-

                *A.id = B.id && A.skewLeft = B.skewRight*

参考:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/


16
投票

根据您遇到的特定偏差类型,可能有不同的方法来解决它。基本思想是:

  • 修改您的连接列,或创建一个新的连接列,该列不会倾斜,但仍保留足够的信息来执行连接
  • 在该非倾斜列上进行连接——生成的分区不会倾斜
  • 连接后,您可以将连接列更新回您的首选格式,或者如果您创建了新列,则将其删除

如果倾斜数据参与join,李牧北的回答中引用的“Fighting the Skew In Spark”文章是一个很好的技术。就我而言,倾斜是由连接列中大量空值引起的。空值不参与连接,但由于 Spark 在连接列上进行分区,因此连接后分区非常倾斜,因为有一个巨大的分区包含所有空值。

我通过添加一个新列来解决这个问题,该列将所有空值更改为分布均匀的临时值,例如“NULL_VALUE_X”,其中 X 被 1 到 10,000 之间的随机数替换,例如(爪哇语):

// Before the join, create a join column with well-distributed temporary values for null swids.  This column
// will be dropped after the join.  We need to do this so the post-join partitions will be well-distributed,
// and not have a giant partition with all null swids.
String swidWithDistributedNulls = "swid_with_distributed_nulls";
int numNullValues = 10000; // Just use a number that will always be bigger than number of partitions
Column swidWithDistributedNullsCol =
    when(csDataset.col(CS_COL_SWID).isNull(), functions.concat(
        functions.lit("NULL_SWID_"),
        functions.round(functions.rand().multiply(numNullValues)))
    )
    .otherwise(csDataset.col(CS_COL_SWID));
csDataset = csDataset.withColumn(swidWithDistributedNulls, swidWithDistributedNullsCol);

然后加入这个新列,然后加入之后:

outputDataset.drop(swidWithDistributedNullsCol);

4
投票

参考https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/ 下面是使用 Pyspark dataframe API 对抗 Spark 中的偏差的代码

创建 2 个数据框:

from math import exp
from random import randint
from datetime import datetime

def count_elements(splitIndex, iterator):
    n = sum(1 for _ in iterator)
    yield (splitIndex, n)

def get_part_index(splitIndex, iterator):
    for it in iterator:
        yield (splitIndex, it)

num_parts = 18
# create the large skewed rdd
skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x))))
skewed_large_rdd = skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x))

skewed_large_df = spark.createDataFrame(skewed_large_rdd,['x','y'])

small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))

small_df = spark.createDataFrame(small_rdd,['a','b'])

将大 df 的数据分为 100 个 bin,并将小 df 复制 100 次

salt_bins = 100
from pyspark.sql import functions as F

skewed_transformed_df = skewed_large_df.withColumn('salt', (F.rand()*salt_bins).cast('int')).cache()

small_transformed_df = small_df.withColumn('replicate', F.array([F.lit(i) for i in range(salt_bins)]))

small_transformed_df = small_transformed_df.select('*', F.explode('replicate').alias('salt')).drop('replicate').cache()

最后连接避免倾斜

t0 = datetime.now()
result2 = skewed_transformed_df.join(small_transformed_df, (skewed_transformed_df['x'] == small_transformed_df['a']) & (skewed_transformed_df['salt'] == small_transformed_df['salt']) )
result2.count() 
print "The direct join takes %s"%(str(datetime.now() - t0))

4
投票

Apache DataFu 有两种执行倾斜连接的方法,它们实现了前面答案中的一些建议。

joinSkewed 方法会加盐(添加随机数列来分割倾斜值)。

broadcastJoinSkewed方法适用于将数据帧分为倾斜部分和规则部分的情况,如moriarty007答案中的方法2中所述。

DataFu 中的这些方法对于使用 Spark 2.x 的项目非常有用。如果您已经使用 Spark 3,那么有用于执行倾斜连接的专用方法(但是确保 AQE 确实适用于您的倾斜连接)。

完全披露 - 我是 Apache DataFu 的成员。


0
投票

您可以尝试将“倾斜”的 RDD 重新分区到更多分区,或者尝试增加

spark.sql.shuffle.partitions
(默认为 200)。

在你的情况下,我会尝试将分区的数量设置为远高于执行器的数量。

© www.soinside.com 2019 - 2024. All rights reserved.