有没有办法为 Spark DataFrame 指定模式,同时允许添加新字段?

问题描述 投票:0回答:1

我有 JSON 格式的数据。有时,数据中可能会缺少一些必需的列。我正在使用架构来提供 JSON 的结构。这修复了嵌套结构中缺失的字段(它添加了

NULL
值并且一切正常)。但是,如果有未包含在架构中的新列,则读取会失败。

有没有一种方法可以仅使用架构来为工作提供必要的结构,同时允许一些架构与我不关心的新字段发生漂移?

编辑:我被要求提供一个最小的例子,我现在就这样做:

假设我有一个模式

root
 |-- application_info: struct (nullable = true)
 |    |-- os: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- device_info: struct (nullable = true)
 |    |-- bluetooth_enabled: boolean (nullable = true)
 |    |-- bluetooth_version: string (nullable = true)

这是我在收到的数据中强制执行的内容。因此,如果记录仅包含 application_info,则模式将强制使用所有嵌套结构的 device_info(使用 NULL,因此不会破坏下游任务)

对于这种情况,它工作得很好,但如果记录添加了一个字段,例如,bluetooth_parameters:字符串,或者在我的情况下,某个字段埋在嵌套深处,那么应用数据模式将失败。

我最初所做的是对所有数据进行采样,以便获得尽可能完整的模式,但这要么效率低下,要么不正确(取决于您的采样方式)

我想要的是一个能够解决非结构化数据这两个问题的解决方案

apache-spark pyspark schema jsonschema
1个回答
0
投票

令我惊讶的是,当我开始制作示例时,我意识到 Spark 可以很好地处理这些模式漂移。我发现的唯一问题是,如果父对象丢失,它不会重新创建整个嵌套结构,而是将其替换为 NULL

分享代码以防万一有人想尝试:

from pyspark.sql.types import StructField, StructType, StringType, BooleanType, Row
import json
#Schema test
schema = StructType([
    StructField("application_info", StructType([
        StructField("os", StringType(), True),
        StructField("version", StringType(), True)
    ]), True),
    StructField("device_info", StructType([
        StructField("bluetooth_enabled", BooleanType(), True),
        StructField("bluetooth_version", StringType(), True)
    ]), True)
])
record_missing_bt_version = {
    "application_info": {"os": "iOS", "version": "14"} # Device info gets replaced by NULL
}

record_additional_bt_info = {
    "application_info": {"os": "iOS", "version": "14"},
    "device_info": {"bluetooth_enabled": True, "bluetooth_version": "2.0", "bluetooth_parameters": "Bitrate - 2.1MBit/s"} #bluetooth_parameters is not included
}

record_additional_info = {
    "application_info": {"os": "iOS", "version": "14"},
    "device_info": {"bluetooth_enabled": True, "bluetooth_version": "2.0"},
    "other_info": {"Install": "Success"} # other_info Does not get included
}

json_records = [json.dumps(record) for record in [record_missing_bt_version, record_additional_bt_info, record_additional_info]]
rdd = spark.sparkContext.parallelize(json_records)
df = spark.read.json(rdd, schema=schema)

df.show(truncate=False)

结果:

+----------------+-----------+
|application_info|device_info|
+----------------+-----------+
|{iOS, 14}       |null       |
|{iOS, 14}       |{true, 2.0}|
|{iOS, 14}       |{true, 2.0}|
+----------------+-----------+
© www.soinside.com 2019 - 2024. All rights reserved.