无法压缩分区数量不等的 RDD。我可以用什么来替代 zip?

问题描述 投票:0回答:3

我有三个相同大小的 RDD

rdd1
包含一个字符串标识符,
rdd2
包含一个向量,
rdd3
包含一个整数值。

本质上,我想将这三个压缩在一起以获得

RDD[String,Vector,Int]
的RDD,但我不断地得到无法压缩分区数量不等的RDD。我怎样才能完全绕过zip来完成上述事情?

scala apache-spark rdd
3个回答
7
投票

尝试:

rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values

1
投票

在拆分原始 RDD 之前,请使用

RDD.zipWithUniqueId
为每一行分配一个唯一的 id。然后确保在您从原始数据中生成的每个 RDD 中包含 id 字段,并将它们用作这些行的键(如果 id 还不是键,请使用
keyBy
),然后使用
RDD.join
重新组合行.

示例可能如下所示:

val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }

/*transformations on rdd1 and 2*/

val重组=rdd1.join(rdd2)


1
投票

它们的元素数量都相同吗?

zipPartitions
用于在特殊情况下连接 RDD,即它们具有“完全”相同数量的分区,并且每个分区中具有“完全”相同数量的元素。 您的案件没有此类保证。如果 rdd3 实际上是空的,你想做什么?你应该得到一个没有元素的 RDD 吗?

编辑:如果您知道长度完全相同,则 LostInOverflow 的答案将起作用。

    

© www.soinside.com 2019 - 2024. All rights reserved.