我有 JSON 格式的数据。有时,数据中可能会缺少一些必需的列。我正在使用架构来提供 JSON 的结构。这修复了嵌套结构中缺失的字段(它添加了
NULL
值并且一切正常)。但是,如果有未包含在架构中的新列,则读取会失败。
有没有一种方法可以仅使用架构来为工作提供必要的结构,同时允许一些架构与我不关心的新字段发生漂移?
编辑:我被要求提供一个最小的例子,我现在就这样做:
假设我有一个模式
root
|-- application_info: struct (nullable = true)
| |-- os: string (nullable = true)
| |-- version: string (nullable = true)
|-- device_info: struct (nullable = true)
| |-- bluetooth_enabled: boolean (nullable = true)
| |-- bluetooth_version: string (nullable = true)
这是我在收到的数据中强制执行的内容。因此,如果记录仅包含 application_info,则模式将强制使用所有嵌套结构的 device_info(使用 NULL,因此不会破坏下游任务)
对于这种情况,它工作得很好,但如果记录添加了一个字段,例如,bluetooth_parameters:字符串,或者在我的情况下,某个字段埋在嵌套深处,那么应用数据模式将失败。
我最初所做的是对所有数据进行采样,以便获得尽可能完整的模式,但这要么效率低下,要么不正确(取决于您的采样方式)
我想要的是一个能够解决非结构化数据这两个问题的解决方案
令我惊讶的是,当我开始制作示例时,我意识到 Spark 可以很好地处理这些模式漂移。我发现的唯一问题是,如果父对象丢失,它不会重新创建整个嵌套结构,而是将其替换为 NULL
分享代码以防万一有人想尝试:
from pyspark.sql.types import StructField, StructType, StringType, BooleanType, Row
import json
#Schema test
schema = StructType([
StructField("application_info", StructType([
StructField("os", StringType(), True),
StructField("version", StringType(), True)
]), True),
StructField("device_info", StructType([
StructField("bluetooth_enabled", BooleanType(), True),
StructField("bluetooth_version", StringType(), True)
]), True)
])
record_missing_bt_version = {
"application_info": {"os": "iOS", "version": "14"} # Device info gets replaced by NULL
}
record_additional_bt_info = {
"application_info": {"os": "iOS", "version": "14"},
"device_info": {"bluetooth_enabled": True, "bluetooth_version": "2.0", "bluetooth_parameters": "Bitrate - 2.1MBit/s"} #bluetooth_parameters is not included
}
record_additional_info = {
"application_info": {"os": "iOS", "version": "14"},
"device_info": {"bluetooth_enabled": True, "bluetooth_version": "2.0"},
"other_info": {"Install": "Success"} # other_info Does not get included
}
json_records = [json.dumps(record) for record in [record_missing_bt_version, record_additional_bt_info, record_additional_info]]
rdd = spark.sparkContext.parallelize(json_records)
df = spark.read.json(rdd, schema=schema)
df.show(truncate=False)
结果:
+----------------+-----------+
|application_info|device_info|
+----------------+-----------+
|{iOS, 14} |null |
|{iOS, 14} |{true, 2.0}|
|{iOS, 14} |{true, 2.0}|
+----------------+-----------+