我使用Kafka Connect HDFS Sink提取了HDFS中保存的AVRO文件。
这是架构注册表项的一部分:
{
"name": "TAX_RATE",
"type": ["null", {
"type": "bytes",
"scale": 127,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "127"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}],
"default": null
}, .....,
}
[当我创建一个读取这些文件的DataFrame时,“ TAX_RATE”列被读为'binary'列。
现在,我想将“二进制”字段转换为在“ logicalType”元数据字段中定义的类型。
为此,我想使用AVRO的元数据。
但是当我尝试从DataFrame架构中读取元数据时,它为空。
val incr_df = spark
.read
.format("avro")
.load("/tmp/ash/binaryToDecimal/partition=2020-04-22-01")
incr_df.printSchema
incr_df.schema.fields.map(f=> f.metadata)
输出:
res7:Array [org.apache.spark.sql.types.Metadata] = Array({},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}} ,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}} ,{},{},{},{},{},{},{},{},{})]
是否有某种方法或设置可以让我从DataFrame架构中读取AVRO元数据?
尝试:
val m = df2.schema.fields
val s = df2.schema.size
val f = df2.schema.fieldNames
我想你想要第一个。