在this previous question,我试图通过避免使用join
来避免Spark join
的记忆问题。
在这个新问题中,我正在使用join
,但试图修复内存问题。
这些是我的两个RDD:
HashPartitioner
分区键
有些密钥会高度重复,有些则不会。
(toast, John)
(butter, John)
(toast, Jane)
(jelly, Jane)
broadcast
来说太大了
用HashPartitioner
分区键
密钥是唯一的,价值是购买产品的客户数量。
(toast, 2)
(butter, 1)
(jelly, 1)
我想加入这两个RDD,结果将是:
(toast, (John, 2))
(butter, (John, 1))
(toast, (Jane, 2))
(jelly, (Jane, 1))
如果我用productToCustomerRDD.join(productToCountRDD)
加入两个RDD,我会在两个分区上获得一个OutOfMemoryError
(成千上万)。在Spark UI中,我注意到在包含join
的阶段,在Input Size / Records
列中,所有分区都有4K到700K的记录。除了产生OOM的两个分区之外的所有分区:一个有9M记录,一个有6M记录。
据我所知,为了加入,需要对具有相同密钥的对进行混洗并移动到同一分区(除非它们先前已被密钥分区)。但是,由于某些密钥非常频繁(例如:几乎每个客户都在数据集中购买的产品),大量数据可能会移动到一个分区,无论是在join
期间还是在repartition
之前,加入。
我理解正确吗?
有办法避免这种情况吗?
有没有办法让join
没有同一分区上一个重复密钥的所有数据?
实际上,这是Spark中的标准问题,称为“倾斜连接”:连接的一侧是倾斜的,这意味着它的一些键比其他键更频繁。一些对我不起作用的答案可以找到here。
我使用的策略的灵感来自定义GraphFrame.skewedJoin()
的here方法及其在ConnectedComponents.skewedJoin()
here中的使用。通过使用广播连接加入最频繁的密钥和使用标准连接的较不频繁的密钥来执行连接。
在我的例子(OP)中,productToCountRDD
已经包含关键频率的信息。所以它是这样的:
productToCountRDD
只保留高于固定阈值的计数,并将collectAsMap()
保留给驱动程序。productToCustomerRDD
拆分为两个RDD:在广播映射中找到的键(频繁键)和不是(不频繁键)的键。mapToPair
执行频繁键的连接,从广播地图获取count
join
执行不常用键的连接。union
来获得完整的RDD。我的第一个问题是:你真的需要theese的详细数据吗?你真的需要知道jhon买了2个toatat等等吗?我们处于大数据环境中,并且我们处理大量数据,因此有时聚合是减少基数并在分析和性能方面获得良好结果的好方法。因此,如果您想知道产品销售的次数,您可以使用pairRDD(产品,计数)[这样您将为每个产品提供一个元素]或者如果您想了解用户偏好,您可以使用pairRDD(用户,购买产品列表)[以这种方式,您将为每个用户提供一个元素]。如果你真的需要知道从Jhon购买吐司,你为什么要在不同的重新分区中分割吐司键?通过这种方式,您无法计算全局结果,因为在每个chunck中,您只有一条关于键的信息。