我正在尝试读取一个具有外部模式文件的无头 avro 文件。无头 avro 有一个结构类型的嵌套数组。我正在尝试将它读入 spark 数据帧并在解析嵌套数组部分时面临问题
root
|-- wmSubRegionName: string (nullable = true)
|-- wmInstrumentIdentification: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- instrumentIdentificationType: string (nullable = true)
| | |-- instrumentIdentifier: string (nullable = true)
| | |-- classificationID: string (nullable = true)
| | |-- instrumentIdentifierSuffix: string (nullable = true)
|-- wmManagementStyleName: string (nullable = true)
|-- wmCapitalizationName: string (nullable = true)
|-- wmIndustryGroupName: string (nullable = true)
错误
Caused by: java.lang.RuntimeException: java.lang.String 不是数组模式的有效外部类型
代码片段
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import org.apache.avro.generic.GenericDatumReader
import org.apache.spark.sql.types.StructType
import com.databricks.spark.avro.SchemaConverters
import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
import org.apache.avro.io.DecoderFactory
//read avro schemajson
val schemajson = sc.textFile("schema.json").glom().map(record=>record.mkString(" ")).first()
val byteRDD = sc.binaryFiles("*.avro").map(record=> {(record._1,record._2.toArray())})
val deserialisedAvroRDD = byteRDD.map(record=> {
import org.apache.avro.Schema
val schema = new Schema.Parser().parse(schemajson)
val datumReader = new GenericDatumReader[GenericRecord](schema)
val decoder = DecoderFactory.get.binaryDecoder(record._2, null)
var datum:GenericRecord = null
import scala.collection.mutable.ArrayBuffer
var dataRecords = ArrayBuffer[GenericRecord]()
while(!decoder.isEnd()) {
datum = datumReader.read(datum,decoder);
dataRecords += datum
}
(record._1,dataRecords.toArray)
}
).flatMap(record=>record._2)
//generate sql types
val avroRDDtoSQL = deserialisedAvroRDD.map(genericrecord=>{
val schema = new Schema.Parser().parse(schemajson)
val sqlType = SchemaConverters.toSqlType(schema)
val objectArray = new Array[Any](genericrecord.asInstanceOf[GenericRecord].getSchema.getFields.size)
import scala.collection.JavaConversions._
for (field <- genericrecord.getSchema.getFields) {
objectArray(field.pos) = String.valueOf(genericrecord.get(field.pos))
}
val r:GenericRowWithSchema = new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
r
})
val schemaDf = new Schema.Parser().parse(schemajson)
val sqlTypeDf = SchemaConverters.toSqlType(schemaDf).dataType.asInstanceOf[StructType]
val df= sqlContext.createDataFrame(avroRDDtoSQL.asInstanceOf[org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]],sqlTypeDf)
df.show()
enter code here