我正在尝试使用 pyspark 读取 avro 文件。我想在读取文件时提供我自己的架构。以下是示例代码。
json_schema = """
{
"type": "record",
"name": "User",
"fields": [
{
"name": "routingNumber",
"type": "string"
}
]
}
"""
schema_dict = json.loads(json_schema)
avro_schema = StructType.fromJson(schema_dict)
spark = SparkSession.builder.appName("AvroReadExample")\
.config('spark.jars', '/Users/harbeerkadian/Documents/workspace/learn-spark/jars/spark-avro_2.12-3.5.0.jar')\
.getOrCreate()
df = spark.read.format("avro").schema(avro_schema).load("/Users/harbeerkadian/Downloads/accounts.avro")
我试图将架构 json 转换为 StructType 时出现错误。
以下是错误详情
Traceback (most recent call last):
File "/Users/harbeerkadian/Documents/workspace/learn-python/solution.py", line 992, in <module>
avro_using_files()
File "/Users/harbeerkadian/Documents/workspace/learn-python/solution.py", line 954, in avro_using_files
avro_schema = StructType.fromJson(schema_dict)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/harbeerkadian/Documents/workspace/python_env/ncr-env/lib/python3.11/site-packages/pyspark/sql/types.py", line 1017, in fromJson
return StructType([StructField.fromJson(f) for f in json["fields"]])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/harbeerkadian/Documents/workspace/python_env/ncr-env/lib/python3.11/site-packages/pyspark/sql/types.py", line 1017, in <listcomp>
return StructType([StructField.fromJson(f) for f in json["fields"]])
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/harbeerkadian/Documents/workspace/python_env/ncr-env/lib/python3.11/site-packages/pyspark/sql/types.py", line 709, in fromJson
json["nullable"],
~~~~^^^^^^^^^^^^
KeyError: 'nullable'
此问题似乎与您定义的
json_schema
变量有关。当您调用 StructType.fromJson
方法时,您需要传递一些强制键,因为您的变量 json_schema
缺少它们,所以您将获得 KeyError
。
一般来说,当您将模式定义为将用作
StructType.fromJson
方法的输入的 json 时,您可以使用以下结构:
[
{
"name": "fieldName1",
"type": "string",
"nullable": true,
"metadata": {}
},
{
"name": "fieldName2",
"type": "integer",
"nullable": false,
"metadata": {}
}
// Add more field definitions as needed
]
在 Avro 模式中,
StructType
只关心 fields
数组,其中每个字段都转换为 StructField
。每个 StructField
都需要 name
、type
、nullable
和 metadata
键。
因此
json_schema
变量必须如下所示:
json_schema = """
{
"type": "record",
"name": "User",
"fields": [
{
"name": "routingNumber",
"type": "string",
"nullable": true,
"metadata": {}
}
]
}
"""
我假设
routingNumber
列可以有空值,因此 nullable: true
,否则请更改它。更改后应该可以正常工作。