问题如下:我们要使用自定义逻辑解析多种文件类型。假设我们具有以下类型:CSV,JSON,BIN。为此,我使用枚举库创建了一个枚举]
import enumeratum._
import scala.collection.immutable
/**
* Implementation of enumeratum for various input file types.
* See more at: https://github.com/lloydmeta/enumeratum
*/
sealed trait InputFileType extends EnumEntry
object InputFileType extends Enum[InputFileType] {
val values: immutable.IndexedSeq[InputFileType] = findValues
case object CSV extends InputFileType
case object JSON extends InputFileType
case object BIN extends InputFileType
}
对于每个InputFileType
,都有一个对应的阅读器:
import org.apache.spark.input.PortableDataStream
class CSVReader(filename: String, file: String)
class JSONReader(filename: String, file: String)
class BINReader(filename: String, file: PortableDataStream)
由于它们应该共享一些共同的步骤并具有相似的结构,因此我想为它们提供一个超类,其中每个读者都应该实现一些步骤。但是file
输入可能会有所不同,因此我考虑为其进行ADT。因此:
import org.apache.spark.input.PortableDataStream
sealed trait File
object File {
case class Text(file: String) extends File
case class BinaryStream(file: PortableDataStream) extends File
}
abstract class AbstractReader(filename: String, file: File) {
def doStep1: Seq[String]
def process: Result
}
class CSVReader(filename: String, file: String) extends AbstractReader(filename, Text(file)) {
override def doStep1: Seq[String] = ???
override def process: Result = ???
}
class JSONReader(filename: String, file: String) extends AbstractReader(filename, Text(file)) {
override def doStep1: Seq[String] = ???
override def process: Result = ???
}
class BINReader(filename: String, file: PortableDataStream) extends AbstractReader(filename, BinaryStream(file)) {
override def doStep1: Seq[String] = ???
override def process: Result = ???
}
//where
case class Data(col1: String, col2: Int)
type Result = List[Data]
现在我的想法是创建一个SparkReader
以便将它们读入RDD[Result]
:
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
class SparkReader(spark: SparkSession, fileType: String) extends Serializable {
def readTextFile(path: String): RDD[(String, String)] = spark.sparkContext.wholeTextFiles(path)
def readBinaryFile(path: String): RDD[(String, PortableDataStream)] = spark.sparkContext.binaryFiles(path)
}
object SparkReader {
def apply(spark: SparkSession, fileType: String): RDD[Result] = InputFileType.withNameInsensitive(fileType) match {
case InputFileType.CSV => new SparkReader(spark, fileType).readTextFile(path)
.map{case (filename: String, file: String) => new CSVReader(filename, file).process}
case InputFileType.JSON => new SparkReader(spark, fileType).readTextFile(path)
.map{case (filename: String, file: String) => new JSONReader(filename, file).process}
case InputFileType.BIN => new SparkReader(spark, fileType).readBinaryFile(path)
.map{case (filename: String, file: PortableDataStream) => new BINReader(filename, file).process}
}
}
但是,我对这种解决方案不满意,因为最后有很多代码重复,并且我希望有一个设计能够隐式地决定使用哪个读者。最后,我想实例化一个类GenericReader
:
class GenericReader(spark: SparkSession, fileType: String) extends Serializable {
def read(path: String): RDD[Result] = ???
}
然后,当我调用new GenericReader(spark, "csv").read("myPath")
时,可以得出它必须使用spark.sparkContext.wholeTextFiles
读取数据并应用CSVReader
的信息。或者当它是new GenericReader(spark, "bin").read("myPath")
时,则用spark.sparkContext.binaryFiles
创建RDD并应用BINReader
。
我也尝试应用隐式设计模式,但没有成功。如何更优雅地解决此问题?
我更喜欢使用类型类方法:
首先是示例所必需的额外类型:
object types {
type Result = List[String]
type PortableDataStream = String
}
import types._
创建密封特征(副产品):
sealed trait InputFileType
case object CSV extends InputFileType
case object JSON extends InputFileType
case object BIN extends InputFileType
创建使用类型参数表示为特征的Reader类型类:
trait Reader[A] {
def doStep1: Seq[String]
def process: Result
}
使用包含“ summoner”方法的伴随对象,以及在编译时可用的实例:
object Reader {
def apply[A <: InputFileType](implicit reader: Reader[A]) = reader
implicit object CSVReader extends Reader[CSV.type] {
override def doStep1: Seq[String] = {
val list = Seq("I am csvReader")
println(list)
list
}
override def process: Result = List("Result Csv")
}
implicit object JSONReader extends Reader[JSON.type] {
override def doStep1: Seq[String] = {
val list = Seq("I am jsonReader")
println(list)
list
}
override def process: Result = List("Result Json")
}
implicit object BINReader extends Reader[BIN.type] {
override def doStep1: Seq[String] = {
val list = Seq("I am binReader")
println(list)
list
}
override def process: Result = List("Result Bin")
}
}
您的GenericReader可以定义为:
class GenericReader[A](implicit reader: Reader[A]) {
def process = reader.doStep1
}
测试代码:
object SparkReader {
import Reader._
def main(args: Array[String]) : Unit = {
new GenericReader[CSV.type]().process
new GenericReader[JSON.type]().process
new GenericReader[BIN.type]().process
}
}
打印:
List(I am csvReader)
List(I am jsonReader)
List(I am binReader)
关于如何在Scala,Scalaz和Cats中使用类型类,有很多信息。