Spark RDD 分区程序在 RDD 中找不到分区

问题描述 投票:0回答:1

学习自定义Spark RDD Partitioner,编写了一些逻辑,但不编译。

在 Spark 2.4.3 中,启动 Spark shell :

case class Transaction(name:String, amount:Double, country:String)
val transactions = Seq(
 Transaction("Bob", 100, "UK"),
 Transaction("James", 15, "UK"),
 Transaction("Marek", 51, "US"),
 Transaction("Paul", 57, "US")
)

import org.apache.spark.Partitioner
class CountryPartitioner(override val numPartitions: Int) extends Partitioner { 
  def getPartition(key: Any): Int = key match { 
     case s: Transaction => s.country.hashCode % numPartitions  
  }  
  override def equals(other: Any): Boolean = other.isInstanceOf[CountryPartitioner]  
  override def hashCode: Int = 0
}

val rdd = sc.parallelize(transactions).partitionBy(new CountryPartitioner(2))

错误是

error: value partitionBy is not a member of org.apache.spark.rdd.RDD[Transaction]
       rdd.partitionBy(new CountryPartitioner(2))
           ^

我从网上了解到,这段代码可以正常运行,没有任何错误。我的代码与此代码几乎相同,区别在于 Transaction 类...不知道为什么我的代码不起作用。即使我也无法为此在线 RDD api。

import org.apache.spark.Partitioner
class TwoPartsPartitioner(override val numPartitions: Int) extends Partitioner { def getPartition(key: Any): Int = key match { case s: String => {if (s(0).toUpper > 'J') 1 else 0 } }
override def equals(other: Any): Boolean = other.isInstanceOf[TwoPartsPartitioner]
override def hashCode: Int = 0
}

var x = sc.parallelize(Array(("sandeep",1),("giri",1),("abhishek",1),("sravani",1),("jude",1)), 3)
var y = x.partitionBy(new TwoPartsPartitioner(2))

来源:https://gist.github.com/girisandeep/f90e456da6f2381f9c86e8e6bc4e8260

scala apache-spark rdd
1个回答
2
投票

这不起作用,因为您需要一个键值对才能使 RDD 分区发挥作用。 Spark 中的消息有时有点模糊。交易类不是 KV 对。

参见 使用自定义分区器在 Pyspark 中分区数据帧,另一个答案,不是我的。

RDD 上的很多操作都是面向 KV Pair 的,例如JOIN,不是特别方便。

© www.soinside.com 2019 - 2024. All rights reserved.