在Apache Flink中读取超过22列的CSV

问题描述 投票:0回答:1

我到目前为止所做的是读取CSV如下:

val data = env.readCsvFile[ElecNormNew](getClass.getResource("/elecNormNew.arff").getPath)

val dataSet = data map { tuple =>
      val list = tuple.productIterator.toList
      val numList = list map (_.asInstanceOf[Double])
      LabeledVector(numList(8), DenseVector(numList.take(8).toArray))
    }

ElecNorNewcase class的地方:

case class ElecNormNew(
  var date: Double,
  var day: Double,
  var period: Double,
  var nswprice: Double,
  var nswdemand: Double,
  var vicprice: Double,
  var vicdemand: Double,
  var transfer: Double,
  var label: Double) extends Serializable {
}

Flink's docs中所述。但现在我正在尝试读取包含53列的CSV。有没有办法自动化这个过程?我需要创建一个包含53个字段的POJO吗?

更新

在Fabian回答之后,我正在尝试这个:

val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)
  val rowIF = new RowCsvInputFormat(new Path(getClass.getResource("/lungcancer.csv").getPath), fieldTypes)
  val csvData: DataSet[Row] = env.createInput[Row](rowIF)
  val dataSet2 = csvData.map { tuple =>
      ???
  }

但不知道如何继续,我怎么假设使用RowTypeInfo

scala csv apache-flink
1个回答
2
投票

您可以使用RowCsvInputFormat如下:

val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.INT)

val rowIF = new RowCsvInputFormat(new Path("file:///myCsv"), fieldTypes)
val csvData: DataSet[Row] = env.createInput[Row](rowIF)

Row将数据存储在Array[Any]中。因此,Flink无法自动推断出Row的字段类型。这比使用类型化的元组或案例类更难使用。您需要明确地为RowTypeInfo提供正确的类型。这可以作为隐式值或通过扩展ResultTypeQueryable接口的函数来完成。

© www.soinside.com 2019 - 2024. All rights reserved.