我有三个相同大小的 RDD
rdd1
包含一个字符串标识符,rdd2
包含一个向量,rdd3
包含一个整数值。
本质上,我想将这三个压缩在一起以获得
RDD[String,Vector,Int]
的RDD,但我不断地得到无法压缩分区数量不等的RDD。我怎样才能完全绕过zip来完成上述事情?
尝试:
rdd1.zipWithIndex.map(_.swap).join(rdd2.zipWithIndex.map(_.swap)).values
在拆分原始 RDD 之前,请使用
RDD.zipWithUniqueId
为每一行分配一个唯一的 id。然后确保在您从原始数据中生成的每个 RDD 中包含 id 字段,并将它们用作这些行的键(如果 id 还不是键,请使用 keyBy
),然后使用 RDD.join
重新组合行.
示例可能如下所示:
val rddWithKey = origionalRdd.zipWithUniqueID().map(_.swap)
val rdd1 = rddWithKey.map{case (key,value) => key -> value.stringField }
val rdd2 = rddWithKey.map{case (key,value) => key -> value.intField }
/*transformations on rdd1 and 2*/
val重组=rdd1.join(rdd2)
它们的元素数量都相同吗?
zipPartitions
用于在特殊情况下连接 RDD,即它们具有“完全”相同数量的分区,并且每个分区中具有“完全”相同数量的元素。
您的案件没有此类保证。如果 rdd3
实际上是空的,你想做什么?你应该得到一个没有元素的 RDD 吗?
编辑:如果您知道长度完全相同,则 LostInOverflow 的答案将起作用。