我正在使用 Spark RDD 连接两个大数据集。一个数据集非常倾斜,因此很少有执行器任务需要很长时间才能完成工作。我该如何解决这种情况?
关于如何做到这一点的非常好的文章:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
简短版:
假设您必须在 A.id=B.id 上连接两个表 A 和 B。假设表 A 在 id=1 上有偏差。
即从A中选择A.id并在A.id = B.id上加入B
有两种基本方法可以解决倾斜连接问题:
将您的查询/数据集分为两部分 - 一部分仅包含倾斜数据,另一部分包含非倾斜数据。 在上面的例子中。查询将变成 -
1. select A.id from A join B on A.id = B.id where A.id <> 1;
2. select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;
第一个查询不会有任何偏差,因此 ResultStage 的所有任务将大致在同一时间完成。
如果我们假设 B 只有几行且 B.id = 1,那么它将适合内存。因此第二个查询将转换为广播连接。这在 Hive 中也称为 Map-side join。
参考:https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization
然后可以合并两个查询的部分结果以获得最终结果。
LeMuBei 上面也提到,第二种方法尝试通过附加额外的列来随机化连接键。 步骤:
在较大的表 (A) 中添加一列,例如 skewLeft,并为所有行填充 0 到 N-1 之间的随机数。
在较小的表 (B) 中添加一列,例如 skewRight。将较小的表复制 N 次。因此,对于原始数据的每个副本,新 skewRight 列中的值将从 0 到 N-1 变化。为此,您可以使用explode sql/dataset 运算符。
在1和2之后,连接2个数据集/表,连接条件更新为-
*A.id = B.id && A.skewLeft = B.skewRight*
参考:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
根据您遇到的特定偏差类型,可能有不同的方法来解决它。基本思想是:
如果倾斜数据参与join,李牧北的回答中引用的“Fighting the Skew In Spark”文章是一个很好的技术。就我而言,倾斜是由连接列中大量空值引起的。空值不参与连接,但由于 Spark 在连接列上进行分区,因此连接后分区非常倾斜,因为有一个巨大的分区包含所有空值。
我通过添加一个新列来解决这个问题,该列将所有空值更改为分布均匀的临时值,例如“NULL_VALUE_X”,其中 X 被 1 到 10,000 之间的随机数替换,例如(爪哇语):
// Before the join, create a join column with well-distributed temporary values for null swids. This column
// will be dropped after the join. We need to do this so the post-join partitions will be well-distributed,
// and not have a giant partition with all null swids.
String swidWithDistributedNulls = "swid_with_distributed_nulls";
int numNullValues = 10000; // Just use a number that will always be bigger than number of partitions
Column swidWithDistributedNullsCol =
when(csDataset.col(CS_COL_SWID).isNull(), functions.concat(
functions.lit("NULL_SWID_"),
functions.round(functions.rand().multiply(numNullValues)))
)
.otherwise(csDataset.col(CS_COL_SWID));
csDataset = csDataset.withColumn(swidWithDistributedNulls, swidWithDistributedNullsCol);
然后加入这个新列,然后加入之后:
outputDataset.drop(swidWithDistributedNullsCol);
参考https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/ 下面是使用 Pyspark dataframe API 对抗 Spark 中的偏差的代码
创建 2 个数据框:
from math import exp
from random import randint
from datetime import datetime
def count_elements(splitIndex, iterator):
n = sum(1 for _ in iterator)
yield (splitIndex, n)
def get_part_index(splitIndex, iterator):
for it in iterator:
yield (splitIndex, it)
num_parts = 18
# create the large skewed rdd
skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x))))
skewed_large_rdd = skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x))
skewed_large_df = spark.createDataFrame(skewed_large_rdd,['x','y'])
small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))
small_df = spark.createDataFrame(small_rdd,['a','b'])
将大 df 的数据分为 100 个 bin,并将小 df 复制 100 次
salt_bins = 100
from pyspark.sql import functions as F
skewed_transformed_df = skewed_large_df.withColumn('salt', (F.rand()*salt_bins).cast('int')).cache()
small_transformed_df = small_df.withColumn('replicate', F.array([F.lit(i) for i in range(salt_bins)]))
small_transformed_df = small_transformed_df.select('*', F.explode('replicate').alias('salt')).drop('replicate').cache()
最后连接避免倾斜
t0 = datetime.now()
result2 = skewed_transformed_df.join(small_transformed_df, (skewed_transformed_df['x'] == small_transformed_df['a']) & (skewed_transformed_df['salt'] == small_transformed_df['salt']) )
result2.count()
print "The direct join takes %s"%(str(datetime.now() - t0))
Apache DataFu 有两种执行倾斜连接的方法,它们实现了前面答案中的一些建议。
joinSkewed 方法会加盐(添加随机数列来分割倾斜值)。
broadcastJoinSkewed方法适用于将数据帧分为倾斜部分和规则部分的情况,如moriarty007答案中的方法2中所述。
DataFu 中的这些方法对于使用 Spark 2.x 的项目非常有用。如果您已经使用 Spark 3,那么有用于执行倾斜连接的专用方法(但是确保 AQE 确实适用于您的倾斜连接)。
完全披露 - 我是 Apache DataFu 的成员。
您可以尝试将“倾斜”的 RDD 重新分区到更多分区,或者尝试增加
spark.sql.shuffle.partitions
(默认为 200)。
在你的情况下,我会尝试将分区的数量设置为远高于执行器的数量。