Trait中的重载方法出错 Spark Scala

问题描述 投票:0回答:1

我有一些代码

trait Reader {
  def read(spark: SparkSession, format: String, path: String): DataFrame
  def read[T: Encoder](spark: SparkSession, format: String, path: String): Dataset[T]
}

class LocalReader extends Reader {

  override def read[T: Encoder](spark: SparkSession, format: String, path: String): Dataset[T] = {
    spark.read
      .format(format)
      .option("header", "true")
      .load(getClass.getResource(path).getPath)
      .as[T]
  }

  override def read(spark: SparkSession, format: String, path: String): DataFrame = {
    spark.read
      .format(format)
      .option("header", "true")
      .load(getClass.getResource(path).getPath)
  }
}


object TopNSimilarCustomers extends SparkJob {
  override def appName: String = "TopNSimilarCustomers"

  override def run(spark: SparkSession, args: Array[String], reader: Reader): Unit = {

    /**
      * Only I/O here
      */

    if (args.length == 0)
      return
    val rawData = reader.read(spark, "json", "/spark-test-data.json")
    val res     = transform(spark, rawData, args(0))

  }

我收到一个错误,在 val rawData = reader.read(spark, "json", "/spark-test-data.json") 不能解析重载方法read。

所以我想有不同用途的ReadersWriters LocalReaderS3Reader,由于它可以返回DF和DS,所以我写了一个重载方法,即使我必须使用一个。最后还得同时实现这两个方法。有什么办法可以避免吗?

有其他方法或更好的方法吗? 如何解决这个错误?

java scala apache-spark overloading
1个回答
1
投票

获取原因 cannot resolve overloaded method read. Reader特性有两个方法,两个方法的参数数量相同。

为了解决这个问题,将方法的名字重命名,比如说 readDF & readDS 或者你也可以查看下面的代码&根据你的要求进行修改。

    case class ReadConfig(format: String,path: String,options: Map[String,String])
    case class WriteConfig(format: String,path: String,options: Map[String,String])
    case class Config(read: ReadConfig,write: WriteConfig)

    trait Writer {
      def write(df: DataFrame): Unit
    }
    trait Reader {
      def read: DataFrame
    }

    trait RW extends Reader with Writer {
      val spark : SparkSession
      val config : Config
    }

    // Add logic for Local
    class Local(override val spark: SparkSession,override val config: Config) extends RW {
      override def read: DataFrame = {
        spark.read
          .format(config.read.format)
          .options(config.read.options)
          .load(config.read.path)
      }
      override def write(df: DataFrame): Unit = {
        df.write
          .format(config.write.format)
          .options(config.write.options)
          .save(config.write.path)
      }
    }

// Add logic for S3
 class S3(override val spark: SparkSession,override val config: Config) extends RW {
      override def read: DataFrame = {
        spark.read
          .format(config.read.format)
          .options(config.read.options)
          .load(config.read.path)
      }
      override def write(df: DataFrame): Unit = {
        df.write
          .format(config.write.format)
          .options(config.write.options)
          .save(config.write.path)
      }
    }
© www.soinside.com 2019 - 2024. All rights reserved.