我有一些代码
trait Reader {
def read(spark: SparkSession, format: String, path: String): DataFrame
def read[T: Encoder](spark: SparkSession, format: String, path: String): Dataset[T]
}
class LocalReader extends Reader {
override def read[T: Encoder](spark: SparkSession, format: String, path: String): Dataset[T] = {
spark.read
.format(format)
.option("header", "true")
.load(getClass.getResource(path).getPath)
.as[T]
}
override def read(spark: SparkSession, format: String, path: String): DataFrame = {
spark.read
.format(format)
.option("header", "true")
.load(getClass.getResource(path).getPath)
}
}
object TopNSimilarCustomers extends SparkJob {
override def appName: String = "TopNSimilarCustomers"
override def run(spark: SparkSession, args: Array[String], reader: Reader): Unit = {
/**
* Only I/O here
*/
if (args.length == 0)
return
val rawData = reader.read(spark, "json", "/spark-test-data.json")
val res = transform(spark, rawData, args(0))
}
我收到一个错误,在 val rawData = reader.read(spark, "json", "/spark-test-data.json")
不能解析重载方法read。
所以我想有不同用途的ReadersWriters LocalReaderS3Reader,由于它可以返回DF和DS,所以我写了一个重载方法,即使我必须使用一个。最后还得同时实现这两个方法。有什么办法可以避免吗?
有其他方法或更好的方法吗? 如何解决这个错误?
获取原因 cannot resolve overloaded method read.
Reader特性有两个方法,两个方法的参数数量相同。
为了解决这个问题,将方法的名字重命名,比如说 readDF & readDS
或者你也可以查看下面的代码&根据你的要求进行修改。
case class ReadConfig(format: String,path: String,options: Map[String,String])
case class WriteConfig(format: String,path: String,options: Map[String,String])
case class Config(read: ReadConfig,write: WriteConfig)
trait Writer {
def write(df: DataFrame): Unit
}
trait Reader {
def read: DataFrame
}
trait RW extends Reader with Writer {
val spark : SparkSession
val config : Config
}
// Add logic for Local
class Local(override val spark: SparkSession,override val config: Config) extends RW {
override def read: DataFrame = {
spark.read
.format(config.read.format)
.options(config.read.options)
.load(config.read.path)
}
override def write(df: DataFrame): Unit = {
df.write
.format(config.write.format)
.options(config.write.options)
.save(config.write.path)
}
}
// Add logic for S3
class S3(override val spark: SparkSession,override val config: Config) extends RW {
override def read: DataFrame = {
spark.read
.format(config.read.format)
.options(config.read.options)
.load(config.read.path)
}
override def write(df: DataFrame): Unit = {
df.write
.format(config.write.format)
.options(config.write.options)
.save(config.write.path)
}
}