如何在Scala Spark中设计抽象阅读器?

问题描述 投票:1回答:1

问题如下:我们要使用自定义逻辑解析多种文件类型。假设我们具有以下类型:CSV,JSON,BIN。为此,我使用枚举库创建了一个枚举]

import enumeratum._
import scala.collection.immutable

/**
  * Implementation of enumeratum for various input file types.
  * See more at: https://github.com/lloydmeta/enumeratum
  */
sealed trait InputFileType extends EnumEntry

object InputFileType extends Enum[InputFileType] {

  val values: immutable.IndexedSeq[InputFileType] = findValues

  case object CSV extends InputFileType

  case object JSON extends InputFileType

  case object BIN extends InputFileType

}

对于每个InputFileType,都有一个对应的阅读器:

import org.apache.spark.input.PortableDataStream

class CSVReader(filename: String, file: String)
class JSONReader(filename: String, file: String)
class BINReader(filename: String, file: PortableDataStream)

由于它们应该共享一些共同的步骤并具有相似的结构,因此我想为它们提供一个超类,其中每个读者都应该实现一些步骤。但是file输入可能会有所不同,因此我考虑为其进行ADT。因此:

import org.apache.spark.input.PortableDataStream

sealed trait File
object File {

  case class Text(file: String) extends File

  case class BinaryStream(file: PortableDataStream) extends File

}

abstract class AbstractReader(filename: String, file: File) {
  def doStep1: Seq[String]
  def process: Result
}

class CSVReader(filename: String, file: String) extends AbstractReader(filename, Text(file)) {
  override def doStep1: Seq[String] = ???
  override def process: Result = ???
}

class JSONReader(filename: String, file: String) extends AbstractReader(filename, Text(file)) {
  override def doStep1: Seq[String] = ???
  override def process: Result = ???
}

class BINReader(filename: String, file: PortableDataStream) extends AbstractReader(filename, BinaryStream(file)) {
  override def doStep1: Seq[String] = ???
  override def process: Result = ???
}

//where
case class Data(col1: String, col2: Int)
type Result = List[Data]

现在我的想法是创建一个SparkReader以便将它们读入RDD[Result]

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

class SparkReader(spark: SparkSession, fileType: String) extends Serializable {
  def readTextFile(path: String): RDD[(String, String)] = spark.sparkContext.wholeTextFiles(path)
  def readBinaryFile(path: String): RDD[(String, PortableDataStream)] = spark.sparkContext.binaryFiles(path)
}
object SparkReader {
  def apply(spark: SparkSession, fileType: String): RDD[Result] = InputFileType.withNameInsensitive(fileType) match {
    case InputFileType.CSV => new SparkReader(spark, fileType).readTextFile(path)
      .map{case (filename: String, file: String) => new CSVReader(filename, file).process}
    case InputFileType.JSON => new SparkReader(spark, fileType).readTextFile(path)
      .map{case (filename: String, file: String) => new JSONReader(filename, file).process}
    case InputFileType.BIN => new SparkReader(spark, fileType).readBinaryFile(path)
      .map{case (filename: String, file: PortableDataStream) => new BINReader(filename, file).process}
  }
}

但是,我对这种解决方案不满意,因为最后有很多代码重复,并且我希望有一个设计能够隐式地决定使用哪个读者。最后,我想实例化一个类GenericReader

class GenericReader(spark: SparkSession, fileType: String) extends Serializable {
  def read(path: String): RDD[Result] = ???
}

然后,当我调用new GenericReader(spark, "csv").read("myPath")时,可以得出它必须使用spark.sparkContext.wholeTextFiles读取数据并应用CSVReader的信息。或者当它是new GenericReader(spark, "bin").read("myPath")时,则用spark.sparkContext.binaryFiles创建RDD并应用BINReader

我也尝试应用隐式设计模式,但没有成功。如何更优雅地解决此问题?

scala apache-spark implicit
1个回答
1
投票

我更喜欢使用类型类方法:

首先是示例所必需的额外类型:

object types {
  type Result = List[String]
  type PortableDataStream = String
}

import types._

创建密封特征(副产品):

sealed trait InputFileType
case object CSV extends InputFileType
case object JSON extends InputFileType
case object BIN extends InputFileType

创建使用类型参数表示为特征的Reader类型类:

trait Reader[A] {
  def doStep1: Seq[String]
  def process: Result
}

使用包含“ summoner”方法的伴随对象,以及在编译时可用的实例:

object Reader {
  def apply[A <: InputFileType](implicit reader: Reader[A]) = reader

  implicit object CSVReader extends Reader[CSV.type] {
    override def doStep1: Seq[String] = {
      val list = Seq("I am csvReader")
      println(list)
      list
    }

    override def process: Result = List("Result Csv")
  }

  implicit object JSONReader extends Reader[JSON.type] {
    override def doStep1: Seq[String] = {
      val list = Seq("I am jsonReader")
      println(list)
      list
    }

    override def process: Result = List("Result Json")
  }

  implicit object BINReader extends Reader[BIN.type] {
    override def doStep1: Seq[String] = {
      val list = Seq("I am binReader")
      println(list)
      list
    }

    override def process: Result = List("Result Bin")
  }

}

您的GenericReader可以定义为:

class GenericReader[A](implicit reader: Reader[A]) {
   def process = reader.doStep1
}

测试代码:

object SparkReader {

  import Reader._

  def main(args: Array[String]) : Unit = {
    new GenericReader[CSV.type]().process
    new GenericReader[JSON.type]().process
    new GenericReader[BIN.type]().process
  }
}

打印:

List(I am csvReader)
List(I am jsonReader)
List(I am binReader)

关于如何在Scala,Scalaz和Cats中使用类型类,有很多信息。

© www.soinside.com 2019 - 2024. All rights reserved.