如何使用嵌套列表和字典加载JSON对象?

问题描述 投票:1回答:1

我有JSON对象,采取以下形式:

{
    "docId" : "1",
    "links" : {
        "a link": ["endpoint 1", "endpoint 2"],
        "another link": ["endpoint 3"] 
    },
    "authors" : ["Thomas", "Peter"],
    "text": "This is the document text!"    
}

为了使用SparkSession.read.json加载它,我将所有这些JSON对象写入一个文件concatenated.txt,其中每行代表一个完整的文档:

{"docId": "1", ...}
{"docId": "2", ...}
{"docId": "3", ...}

问题是如果我跑

df_data = spark.read.json('concatenated.txt')

它不起作用,因为我遇到了java.lang.OutOfMemoryError。我不明白确切的问题,但我想它只是无法推断出正确的类型。出于这个原因,我将以下模式定义为解决方法。到目前为止这是有效的:

schema = StructType([
        StructField("docId", StringType(), True),
        StructField("links", StringType(), True),
        StructField("authors", StringType(), True),
        StructField("text", StringType(), True)                              
])

df_data = spark.read.json('concatenated.txt', schema=schema)

但当然,这要求我做一些事情:

import json 
# ..
df_data.flatMap(lambda x: json.loads(x.authors))

为了拥有实际的底层对象。

我的问题是如何加载JSON数据,以便所有复杂对象实际上都是在加载时构造的。所以authors应该例如总是包含list(或None),而linksdict,其值也是list类型。


样本数据:

{ "docId" : "1", "links" : { "a link": ["endpoint 1", "endpoint 2"], "another link": ["endpoint 3"] }, "authors" : ["Thomas", "Peter"], "text": "This is the document text!" }
{ "docId" : "2", "links" : { "a link": ["endpoint 1", "endpoint 2"], "another link": ["endpoint 3"] }, "authors" : ["Thomas", "Peter"], "text": "This is the document text!" }
{ "docId" : "3", "links" : { "a link": ["endpoint 1", "endpoint 2"], "another link": ["endpoint 3"] }, "authors" : ["Thomas", "Peter"], "text": "This is the document text!" }
{ "docId" : "4", "links" : { "a link": ["endpoint 1", "endpoint 2"], "another link": ["endpoint 3"] }, "authors" : ["Thomas", "Peter"], "text": "This is the document text!" }
{ "docId" : "5", "links" : { "a link": ["endpoint 1", "endpoint 2"], "another link": ["endpoint 3"] }, "authors" : ["Thomas", "Peter"], "text": "This is the document text!" }
{ "docId" : "6", "links" : { "a link": ["endpoint 1", "endpoint 2"], "another link": ["endpoint 3"] }, "authors" : ["Thomas", "Peter"], "text": "This is the document text!" }
apache-spark pyspark
1个回答
0
投票

文件加载正常(Spark 1.6,cloudera VM 5.12)。这里是。检查你的spark.driver.memory

>>> sqlContext.sql("select * from json.`file:///home/cloudera/data4.json`").show()
+---------------+-----+--------------------+--------------------+
|        authors|docId|               links|                text|
+---------------+-----+--------------------+--------------------+
|[Thomas, Peter]|    1|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    2|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    3|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    4|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    5|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    6|[WrappedArray(end...|This is the docum...|
+---------------+-----+--------------------+--------------------+

>>> sqlContext.read.json("file:///home/cloudera/data4.json").show()
+---------------+-----+--------------------+--------------------+
|        authors|docId|               links|                text|
+---------------+-----+--------------------+--------------------+
|[Thomas, Peter]|    1|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    2|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    3|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    4|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    5|[WrappedArray(end...|This is the docum...|
|[Thomas, Peter]|    6|[WrappedArray(end...|This is the docum...|
+---------------+-----+--------------------+--------------------+
© www.soinside.com 2019 - 2024. All rights reserved.