如何将具有不同长度的多个RDD合并为具有特定顺序模式的单个RDD?

问题描述 投票:-2回答:1

我有几个不同长度的RDD:

RDD1 : [a, b, c, d, e, f, g]
RDD2 : [1, 3 ,2, 44, 5]
RDD3 : [D, F, G]

我想用订购模式将它们组合成一个RDD:

每5行:从RDD1获得2行,从RDD2获得2行,然后从1获得1行RDD3

此模式应循环播放,直到所有RDD用尽。上面的输出应该是:

RDDCombine : [a,b,1,3,D,  c,d,2,44,F,  e,f,5,G, g]

如何实现?非常感谢!

背景:我正在设计一个推荐系统。现在,我有几种来自不同算法的RDD输出,我想以某种顺序模式将它们组合起来以提出混合推荐。

apache-spark rdd recommender-systems
1个回答
0
投票

我不会说这是一个最佳解决方案,但可能会帮助您入门。.同样,这还不是生产准备就绪的代码。另外,基于较少的数据,我使用的分区数为1,但是您可以对其进行编辑。

def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName("some")
    val sc = new SparkContext(conf)

    val rdd2 = sc.parallelize(Seq(1,3,2,44,5),1)
    val rdd1 = sc.parallelize(Seq('a','b','c','d','e','f','g'),1)
    val rdd3 = sc.parallelize(Seq('D','F','G'),1)

    val groupingCount = 2

    val rdd = rdd1.zipPartitions(rdd2,rdd3)((a,b,c) => {
      val ag = a.grouped(groupingCount)
      val bg = b.grouped(groupingCount)
      val cg = c.grouped(1)
      ag.zip(bg).zip(cg).map(x=> x._1._1 ++ x._1._2 ++x._2
      )
    })


    rdd.foreach(println)
    sc.stop()


  }
© www.soinside.com 2019 - 2024. All rights reserved.