Spark配对rdd按键和组配对RDD并从每组中选择最新组

问题描述 投票:0回答:1

新的火花和斯卡拉。试着在下面实现。我的消息如下所示(key,id,version,dataObject)

val transformedRDD = processedMessages.flatMap(message => {
    message.isProcessed match {
      case true => Some(message.key, message.id, message.version, message)
      case false => None
    }
  }).groupByKey

我希望在每条消息上按ID分组并获取最新版本的消息,然后使用groupbykey,然后调用预定义的方法,如下所示

Ingest(key,RDD[dataObject])
scala apache-spark spark-streaming
1个回答
0
投票

在大多数情况下,你应该避免使用groupByKey,因为它可能导致重新洗牌,这可能非常昂贵。在您的用例中,您不需要groupByKey并且可以使用reduceByKey代替。

val transformedRDD = processedMessages
  // notice that we will have Rdd[(String, Message)] or PairRdd after this flatMap
  .flatMap(message => message.isProcessed match {
    case true => Some((message.id, message))
    case false => None
  })
  // after this reduction we will have latest message for each id
  .reduceByKey((m1: Message, m2: Message) => m1.version >= m2.version match {
    case true => m1
    case false => m2
  })
  // now we just want to keep message
  .map({ case (id, message) => message })
© www.soinside.com 2019 - 2024. All rights reserved.