我必须将rdd与它的类型匹配。
trait Fruit
case class Apple(price:Int) extends Fruit
case class Mango(price:Int) extends Fruit
现在类型DStream[Fruit]
的dstream来了。它是Apple
或Mango
。
如何基于子类执行操作?类似于以下内容(无效):
dStream.foreachRDD{rdd:RDD[Fruit] =>
rdd match {
case rdd: RDD[Apple] =>
//do something
case rdd: RDD[Mango] =>
//do something
case _ =>
println(rdd.count() + "<<<< not matched anything")
}
}
由于我们有一个RDD[Fruit]
,所以任何行都可以是Apple
或Mango
。当使用foreachRDD
时,每个RDD
将包含这些(以及其他可能的)类型的混合。
为了区分不同的类型,我们可以使用collect[U](f: PartialFunction[T, U]): RDD[U]
(不要与collect[U](f: PartialFunction[T, U]): RDD[U]
混淆,后者会返回包含RDD中的元素的列表)。通过应用函数collect(): Array[T]
,此函数将返回包含所有匹配值的RDD(在这种情况下,我们可以在此处使用模式匹配)。
[下面是一个小的说明性示例(也将f
添加到水果中)。
设置:
Orange
这将创建具有两个单独的val ssc = new StreamingContext(spark.sparkContext, Seconds(1))
val inputData: Queue[RDD[Fruit]] = Queue()
val dStream: InputDStream[Fruit] = ssc.queueStream(inputData)
inputData += spark.sparkContext.parallelize(Seq(Apple(5), Apple(5), Mango(11)))
inputData += spark.sparkContext.parallelize(Seq(Mango(10), Orange(1), Orange(3)))
的RDD[Fruit]
流。
RDD
在上面的dStream.foreachRDD{rdd: RDD[Fruit] =>
val mix = rdd.collect{
case row: Apple => ("APPLE", row.price) // do any computation on apple rows
case row: Mango => ("MANGO", row.price) // do any computation on mango rows
//case _@row => do something with other rows (will be removed by default).
}
mix foreach println
}
中,我们略微更改每行(删除类),然后打印结果collect
。结果:
RDD
可以看出,模式匹配保留并更改了包含// First RDD
(MANGO,11)
(APPLE,5)
(APPLE,5)
// Second RDD
(MANGO,10)
和Apple
的行,同时删除了所有Mango
类。
单独的RDD
如果需要,还可以如下将两个子类分成各自的Orange
。然后可以对这两个RDD
进行任何计算。
RDD
完整示例代码
val apple = rdd.collect{case row: Apple => row}
val mango = rdd.collect{case row: Mango => row}