(Spark skewed join)如何使用高度重复的密钥加入两个没有内存问题的大型Spark RDD?

问题描述 投票:3回答:2

this previous question,我试图通过避免使用join来避免Spark join的记忆问题。

在这个新问题中,我正在使用join,但试图修复内存问题。

这些是我的两个RDD:

  1. productToCustomerRDD: 大小:非常大,可能有数百万个不同的键 用HashPartitioner分区键 有些密钥会高度重复,有些则不会。 (toast, John) (butter, John) (toast, Jane) (jelly, Jane)
  2. productToCountRDD: 尺寸:非常大,可能有数百万个不同的键,对broadcast来说太大了 用HashPartitioner分区键 密钥是唯一的,价值是购买产品的客户数量。 (toast, 2) (butter, 1) (jelly, 1)

我想加入这两个RDD,结果将是:

  1. customerToProductAndCountRDD: (toast, (John, 2)) (butter, (John, 1)) (toast, (Jane, 2)) (jelly, (Jane, 1))

如果我用productToCustomerRDD.join(productToCountRDD)加入两个RDD,我会在两个分区上获得一个OutOfMemoryError(成千上万)。在Spark UI中,我注意到在包含join的阶段,在Input Size / Records列中,所有分区都有4K到700K的记录。除了产生OOM的两个分区之外的所有分区:一个有9M记录,一个有6M记录。

据我所知,为了加入,需要对具有相同密钥的对进行混洗并移动到同一分区(除非它们先前已被密钥分区)。但是,由于某些密钥非常频繁(例如:几乎每个客户都在数据集中购买的产品),大量数据可能会移动到一个分区,无论是在join期间还是在repartition之前,加入。

我理解正确吗? 有办法避免这种情况吗? 有没有办法让join没有同一分区上一个重复密钥的所有数据?

java apache-spark join rdd scalability
2个回答
2
投票

实际上,这是Spark中的标准问题,称为“倾斜连接”:连接的一侧是倾斜的,这意味着它的一些键比其他键更频繁。一些对我不起作用的答案可以找到here

我使用的策略的灵感来自定义GraphFrame.skewedJoin()here方法及其在ConnectedComponents.skewedJoin() here中的使用。通过使用广播连接加入最频繁的密钥和使用标准连接的较不频繁的密钥来执行连接。

在我的例子(OP)中,productToCountRDD已经包含关键频率的信息。所以它是这样的:

  • 过滤productToCountRDD只保留高于固定阈值的计数,并将collectAsMap()保留给驱动程序。
  • 将此映射广播给所有执行程序。
  • productToCustomerRDD拆分为两个RDD:在广播映射中找到的键(频繁键)和不是(不频繁键)的键。
  • 使用mapToPair执行频繁键的连接,从广播地图获取count
  • 使用join执行不常用键的连接。
  • 最后使用union来获得完整的RDD。

0
投票

我的第一个问题是:你真的需要theese的详细数据吗?你真的需要知道jhon买了2个toatat等等吗?我们处于大数据环境中,并且我们处理大量数据,因此有时聚合是减少基数并在分析和性能方面获得良好结果的好方法。因此,如果您想知道产品销售的次数,您可以使用pairRDD(产品,计数)[这样您将为每个产品提供一个元素]或者如果您想了解用户偏好,您可以使用pairRDD(用户,购买产品列表)[以这种方式,您将为每个用户提供一个元素]。如果你真的需要知道从Jhon购买吐司,你为什么要在不同的重新分区中分割吐司键?通过这种方式,您无法计算全局结果,因为在每个chunck中,您只有一条关于键的信息。

© www.soinside.com 2019 - 2024. All rights reserved.