如何从 PySpark 数据框中的嵌套 JSON 中提取列值?

问题描述 投票:0回答:1

我正在开发一个 PySpark 数据框(

es_query
),其中包含嵌套的 JSON 列(
r_json
brd_json
vs_json
)。我需要帮助提取列数据并将其存储在另一个数据帧(
e_result
)中,作为 URL 和产品编号值的两个不同列,其中每个列都是每行中的单独记录。

最后,我们必须将所有值放入一个数据框中。

上述各列的样本数据如下:

r_json:
results:
0: {"col1": "Yes", "name": "", "col2":  1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "B0e28213", "url": "https://www.am"}
1: {"col1": "Yes", "name": "", "col2":  1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "019883", "url": "https://www.am"}

brd_json:
array:
0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "11873628", "rating": "4.1", "url": "https://www.amazon"}
1: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "001838", "rating": "4.1", "url": "https://www.amazon"}

vs_json:
array:
0: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "1212", "rating": "4.1", "url": "https://www.amazon"}
1: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "2321", "rating": "4.1", "url": "https://www.amazon"}

我可以在编写脚本方面获得一些帮助吗?以下是我尝试过的:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

result_urls = results.select("url").withColumn("source", lit("result"))
brsd_json_url = brsd_json.select("url").withColumn("source", lit("brand"))
vis_json_url = brsd_json.select("url").withColumn("source", lit("video"))

combined_urls = result_urls.union(brsd_json_url).union(vis_json_url)

esearch_result = combined_urls.groupBy("source").agg({"url": "concat_ws"}).select("concat_ws(url) as prod_page_url_address")

我尝试了下面的方法,它能够帮助将所有列提取为单独的 pyspark 数据帧,但情况是如何考虑所有值将它们组合成一个数据帧

column_names = ["url", "productNumber", "col2", "col3", "col4", "col5", 
                "col6", "col7"]

# Define a function to create the expressions
def create_expr(column_name, json_column):
    try:
        expr = esearch_request_query.selectExpr(f"EXPLODE({json_column}.{column_name}) as {column_name}")
    except:
        expr = None
    return expr

# Iterate over the column names
for column_name in column_names:
    results_expr = create_expr(column_name, "result_json.re")
    brsd_json_url  = create_expr(column_name, "brsd_json")
    vis_json_url  = create_expr(column_name, "vis_json")

    globals()[f"results_{column_name}"] = results_expr
    globals()[f"brsd_json_{column_name}"] = brsd_json_url
    globals()[f"vis_json_{column_name}"] = vis_json_url

# case where "col2" does not exist in sponsored_video_json
try:
    results_brandName = esearch_request_query.selectExpr("EXPLODE(result_json.re.col2) as col2_name")
except:
    results_brandName = None

try:
    sponnd_col2 = esearch_request_query.selectExpr("EXPLODE(brsd_json.spon) as col2_name")
except:
    spond_col2 = None

spovideo_col2 = None  # Handle this case separately, as it does not exist in vis_json
python json apache-spark pyspark azure-databricks
1个回答
0
投票

正如您所提到的,您想要提取列数据并将其存储在另一个数据帧(e_result)中。我已经定义了架构并尝试了以下方法:

es_query = spark.createDataFrame([data], schema=schema)
r_json_df = es_query.select(explode("r_json").alias("r_json")).select(
    col("r_json.productNumber").alias("productNumber"),
    col("r_json.url").alias("url")
)
brd_json_df = es_query.select(explode("brd_json").alias("brd_json")).select(
    col("brd_json.productNumber").alias("productNumber"),
    col("brd_json.url").alias("url")
)
vs_json_df = es_query.select(explode("vs_json").alias("vs_json")).select(
    col("vs_json.productNumber").alias("productNumber"),
    col("vs_json.url").alias("url")
)
e_result = r_json_df.union(brd_json_df).union(vs_json_df)
e_result.show(truncate=False)

结果:

+-------------+------------------+
|productNumber|url               |
+-------------+------------------+
|B0e28213     |https://www.am    |
|019883       |https://www.am    |
|11873628     |https://www.amazon|
|001838       |https://www.amazon|
|1212         |https://www.amazon|
|2321         |https://www.amazon|
+-------------+------------------+
  • 在上面的代码中,它从 es_query 中选择 r_json 数组列,将其分解以将每个元素分隔成一个新行,然后从分解的 r_json 列中选择 ProductNumber 和 url 字段。
  • 接下来,它从 brd_json 数组列中提取 ProductNumber 和 url。
  • 然后,它从 vs_json 数组列中提取 ProductNumber 和 url。最后,它使用 union 将 DataFrame r_json_df、brd_json_df 和 vs_json_df 组合成单个 DataFrame e_result。
© www.soinside.com 2019 - 2024. All rights reserved.