Spark 2.0 - 将JSON文件展平为CSV

问题描述 投票:0回答:2

我正在尝试将JSON文件转换为拼合的CSV文件。这是我试过的:

输出:enter image description here

我不知道如何正确操作spark sql中的qualify列并返回正确的值。

from pyspark.sql.functions import *

dummy = spark.read.json('dummy-3.json')
qualify = dummy.select("user_id", "rec_id", "uut", "hash", explode("qualify").alias("qualify"))
qualify.show()

+-------+------+---+------+--------------------+
|user_id|rec_id|uut|  hash|             qualify|
+-------+------+---+------+--------------------+
|      1|     2| 12|abc123|[cab321,test-1,of...|
|      1|     2| 12|abc123|[cab123,test-2,of...|
+-------+------+---+------+--------------------+

JSON示例:

{
  "user_id": 1,
  "rec_id": 2,
  "uut": 12,
  "hash": "abc123"
  "qualify":[{
    "offer": "offer-1",
    "name": "test-1",
    "hash": "cab321",
    "qualified": false"
    "rules": [{
      "name": "name of rule 1",
      "approved": true,
      "details": {}
    },
    {
    "name": "name of rule 2",
    "approved": false,
    "details": {}
    }]
  },{
    "offer": "offer-2",
    "name": "test-2",
    "hash": "cab123",
    "qualified": true
    "rules": [{
      "name": "name of rule 1",
      "approved": true,
      "details": {}
    },
    {
    "name": "name of rule 2",
    "approved": false,
    "details": {}
    }]
  }
}

JSON SCHEMA:

root
 |-- hash: string (nullable = true)
 |-- qualify: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- hash: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- offer: string (nullable = true)
 |    |    |-- qualified: boolean (nullable = true)
 |    |    |-- rules: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- approved: boolean (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |-- rec_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- uut: long (nullable = true)

我尝试将DataFrame转换为RDD并创建一个map函数来返回值,但我认为这不是一个好方法。我错了吗?

有人在处理过类似的问题吗?

谢谢你的帮助。

json csv apache-spark pyspark
2个回答
1
投票
qualify = dummy.withColumn('qualify',f.explode(dummy['qualify']))
result = qualify.withColumn('qualify_name', qualify['qualify']['name'])

你可以通过StructType()a.b输入a['b']


0
投票

我使用了explode函数,但为每次爆炸创建了一个新的数据帧。

df2 = df.select(col("userId").alias("user_id"),\
                col("recommendationId").alias("rec_id"),\
                col("utsId").alias("uts_id"),\
                col("gitHash").alias("git_hash"), \
                from_unixtime(col("createdAt")).alias("created"), \
                explode("qualifyResults").alias("qualify"))

df3 = df2.select("user_id",\
                 "rec_id",\
                 "uts_id",\
                 "git_hash",\
                 "created",\
                 col("qualify.offerId").alias("qualify_offer"),\
                 col("qualify.qualifyName").alias("qualify_name"),\
                 col("qualify.qualifyHash").alias("qualify_hash"),\
                 col("qualify.qualified").alias("qualify_qualified"),\
                 explode("qualify.rulesResult").alias("rules"))

#removi os details ate 
df4 = df3.select("user_id",\
                 "rec_id",\
                 "uts_id",\
                 "git_hash",\
                 "created",\
                 "qualify_offer",\
                 "qualify_name",\
                 "qualify_hash",\
                 "qualify_qualified",\
                 col("rules.name").alias("rule_name"),\
                 col("rules.approved").alias("rule_approved"),\
                 col("rules.details").alias("rule_details"))

使用这种方法,我能够获得我想要的CSV表单。

© www.soinside.com 2019 - 2024. All rights reserved.