使用嵌套数组从无头 avro 创建数据框

问题描述 投票:0回答:0

我正在尝试读取一个具有外部模式文件的无头 avro 文件。无头 avro 有一个结构类型的嵌套数组。我正在尝试将它读入 spark 数据帧并在解析嵌套数组部分时面临问题

root
 |-- wmSubRegionName: string (nullable = true)
 |-- wmInstrumentIdentification: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- instrumentIdentificationType: string (nullable = true)
 |    |    |-- instrumentIdentifier: string (nullable = true)
 |    |    |-- classificationID: string (nullable = true)
 |    |    |-- instrumentIdentifierSuffix: string (nullable = true)
 |-- wmManagementStyleName: string (nullable = true)
 |-- wmCapitalizationName: string (nullable = true)
 |-- wmIndustryGroupName: string (nullable = true)

错误 Caused by: java.lang.RuntimeException: java.lang.String 不是数组模式的有效外部类型 在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.MapObjects_0$(未知来源) 在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(未知来源) 在 org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(未知来源) 在 org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:289) ... 还有 25 个

代码片段

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.avro.generic.GenericDatumReader
import org.apache.spark.sql.types.StructType
import com.databricks.spark.avro.SchemaConverters
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}

import org.apache.avro.io.DecoderFactory

//read avro schemajson
val schemajson = sc.textFile("schema.json").glom().map(record=>record.mkString(" ")).first()
val byteRDD = sc.binaryFiles("*.avro").map(record=> {(record._1,record._2.toArray())})

val deserialisedAvroRDD = byteRDD.map(record=> {
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(schemajson)
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get.binaryDecoder(record._2, null)
var datum:GenericRecord = null
import scala.collection.mutable.ArrayBuffer
var dataRecords = ArrayBuffer[GenericRecord]()
while(!decoder.isEnd()) {
datum = datumReader.read(datum,decoder);
dataRecords += datum
}
(record._1,dataRecords.toArray)
}
).flatMap(record=>record._2)

//generate sql types
val avroRDDtoSQL = deserialisedAvroRDD.map(genericrecord=>{
val schema = new Schema.Parser().parse(schemajson)
val sqlType = SchemaConverters.toSqlType(schema)
val objectArray = new Array[Any](genericrecord.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- genericrecord.getSchema.getFields) {
      objectArray(field.pos) = String.valueOf(genericrecord.get(field.pos))
    }
   val r:GenericRowWithSchema =  new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
   r
})
val schemaDf = new Schema.Parser().parse(schemajson)
val sqlTypeDf = SchemaConverters.toSqlType(schemaDf).dataType.asInstanceOf[StructType]
val df= sqlContext.createDataFrame(avroRDDtoSQL.asInstanceOf[org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]],sqlTypeDf)
df.show()

enter code here

dataframe apache-spark avro spark-avro avro-tools
© www.soinside.com 2019 - 2024. All rights reserved.