使用 pyspark 读取 avro 文件时提供架构

问题描述 投票:0回答:1

我正在尝试使用 pyspark 读取 avro 文件。我想在读取文件时提供我自己的架构。以下是示例代码。

json_schema = """
{
    "type": "record",
    "name": "User",
    "fields": [
        {
            "name": "routingNumber",
            "type": "string"
        }
        
    ]
}
"""

schema_dict = json.loads(json_schema)
avro_schema = StructType.fromJson(schema_dict)

spark = SparkSession.builder.appName("AvroReadExample")\
        .config('spark.jars', '/Users/harbeerkadian/Documents/workspace/learn-spark/jars/spark-avro_2.12-3.5.0.jar')\
        .getOrCreate()
    df = spark.read.format("avro").schema(avro_schema).load("/Users/harbeerkadian/Downloads/accounts.avro")

我试图将架构 json 转换为 StructType 时出现错误。

以下是错误详情

Traceback (most recent call last):
  File "/Users/harbeerkadian/Documents/workspace/learn-python/solution.py", line 992, in <module>
    avro_using_files()
  File "/Users/harbeerkadian/Documents/workspace/learn-python/solution.py", line 954, in avro_using_files
    avro_schema = StructType.fromJson(schema_dict)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/harbeerkadian/Documents/workspace/python_env/ncr-env/lib/python3.11/site-packages/pyspark/sql/types.py", line 1017, in fromJson
    return StructType([StructField.fromJson(f) for f in json["fields"]])
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/harbeerkadian/Documents/workspace/python_env/ncr-env/lib/python3.11/site-packages/pyspark/sql/types.py", line 1017, in <listcomp>
    return StructType([StructField.fromJson(f) for f in json["fields"]])
                       ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/harbeerkadian/Documents/workspace/python_env/ncr-env/lib/python3.11/site-packages/pyspark/sql/types.py", line 709, in fromJson
    json["nullable"],
    ~~~~^^^^^^^^^^^^
KeyError: 'nullable'
python apache-spark pyspark schema avro
1个回答
0
投票

此问题似乎与您定义的

json_schema
变量有关。当您调用
StructType.fromJson
方法时,您需要传递一些强制键,因为您的变量
json_schema
缺少它们,所以您将获得
KeyError

一般来说,当您将模式定义为将用作

StructType.fromJson
方法的输入的 json 时,您可以使用以下结构:

[
    {
        "name": "fieldName1",
        "type": "string",
        "nullable": true,
        "metadata": {}
    },
    {
        "name": "fieldName2",
        "type": "integer",
        "nullable": false,
        "metadata": {}
    }
    // Add more field definitions as needed
]

在 Avro 模式中,

StructType
只关心
fields
数组,其中每个字段都转换为
StructField
。每个
StructField
都需要
name
type
nullable
metadata
键。

因此

json_schema
变量必须如下所示:

json_schema = """
{
    "type": "record",
    "name": "User",
    "fields": [
        {
            "name": "routingNumber",
            "type": "string",
            "nullable": true,
            "metadata": {}
        }
    ]
}
"""

我假设

routingNumber
列可以有空值,因此
nullable: true
,否则请更改它。更改后应该可以正常工作。

© www.soinside.com 2019 - 2024. All rights reserved.