Spark无法从二进制文件读取所有记录

问题描述 投票:0回答:1

我正在尝试从S3读取Avro文件,并且如此spark documentation所示,我可以正常读取它。我的文件如下所示,这些文件各包含5000条记录。

s3a://bucket/part-0.avro
s3a://bucket/part-1.avro
s3a://bucket/part-2.avro

val byteRDD: RDD[Array[Byte]] = sc.binaryFiles(s"$s3URL/*.avro").map{ case(file, pds) => {
  val dis = pds.open()
  val len = dis.available()
  val buf = Array.ofDim[Byte](len)
  pds.open().readFully(buf)
  buf
}}

import org.apache.avro.io.DecoderFactory
val deserialisedAvroRDD = byteRDD.map(record => {

  import org.apache.avro.Schema
  val schema = new Schema.Parser().parse(schemaJson)
  val datumReader = new GenericDatumReader[GenericRecord](schema)

  val decoder = DecoderFactory.get.binaryDecoder(record, null)
  var datum: GenericRecord = null
  while (!decoder.isEnd()) {
    datum = datumReader.read(datum, decoder)
  }
  datum
}
)

deserialisedAvroRDD.count() ---> 3

我正在反序列化binaryAvro消息以生成GenericRecords,并且我期望反序列化的RDD具有15k记录,因为每个.avro文件具有5k记录,但是反序列化之后,我仅得到3条记录。有人可以帮忙找出与我的代码有关的问题吗?我如何一次序列化一个记录。

apache-spark deserialization avro binaryfiles spark-avro
1个回答
1
投票

这应该工作

val recRDD: RDD[GenericRecord] = sc.binaryFiles(s"$s3URL/*.avro").flatMap {
  case (file, pds) => {
    val schema =  new Schema.Parser().parse(schemaJson)
    val datumReader = new GenericDatumReader[GenericRecord](schema)

    val decoder = DecoderFactory.get.binaryDecoder(pds.toArray(), null)
    var datum: GenericRecord = null
    val out = ArrayBuffer[GenericRecord]()
    while (!decoder.isEnd()) {
      out += datumReader.read(datum, decoder)
    }
    out
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.