如何在Apache Spark中将RDD [ParentClass]与RDD [Subclass]进行匹配?

问题描述 投票:3回答:1

我必须将rdd与它的类型匹配。

trait Fruit

case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends  Fruit

现在类型DStream[Fruit]的dstream来了。它是AppleMango

如何基于子类执行操作?类似于以下内容(无效):

dStream.foreachRDD{rdd:RDD[Fruit] =>
     rdd match {
       case rdd: RDD[Apple] =>
         //do something

       case rdd: RDD[Mango] =>
         //do something

       case _ =>
         println(rdd.count() + "<<<< not matched anything")
     }
    }
scala apache-spark spark-streaming subclass rdd
1个回答
1
投票

由于我们有一个RDD[Fruit],所以任何行都可以是AppleMango。当使用foreachRDD时,每个RDD将包含这些(以及其他可能的)类型的混合。

为了区分不同的类型,我们可以使用collect[U](f: PartialFunction[T, U]): RDD[U](不要与collect[U](f: PartialFunction[T, U]): RDD[U]混淆,后者会返回包含RDD中的元素的列表)。通过应用函数collect(): Array[T],此函数将返回包含所有匹配值的RDD(在这种情况下,我们可以在此处使用模式匹配)。

[下面是一个小的说明性示例(也将f添加到水果中)。

设置:

Orange

这将创建具有两个单独的val ssc = new StreamingContext(spark.sparkContext, Seconds(1)) val inputData: Queue[RDD[Fruit]] = Queue() val dStream: InputDStream[Fruit] = ssc.queueStream(inputData) inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11))) inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3))) RDD[Fruit]流。

RDD

在上面的dStream.foreachRDD{rdd: RDD[Fruit] => val mix = rdd.collect{ case row: Apple => ("APPLE", row.price) // do any computation on apple rows case row: Mango => ("MANGO", row.price) // do any computation on mango rows //case _@row => do something with other rows (will be removed by default). } mix foreach println } 中,我们略微更改每行(删除类),然后打印结果collect。结果:

RDD

可以看出,模式匹配保留并更改了包含// First RDD (MANGO,11) (APPLE,5) (APPLE,5) // Second RDD (MANGO,10) Apple的行,同时删除了所有Mango类。


单独的RDD

如果需要,还可以如下将两个子类分成各自的Orange。然后可以对这两个RDD进行任何计算。

RDD

完整示例代码

val apple = rdd.collect{case row: Apple => row}
val mango = rdd.collect{case row: Mango => row}
© www.soinside.com 2019 - 2024. All rights reserved.