我正在开发一个 PySpark 数据框(
es_query
),其中包含嵌套的 JSON 列(r_json
、brd_json
、vs_json
)。我需要帮助提取列数据并将其存储在另一个数据帧(e_result
)中,作为 URL 和产品编号值的两个不同列,其中每个列都是每行中的单独记录。
最后,我们必须将所有值放入一个数据框中。
上述各列的样本数据如下:
r_json:
results:
0: {"col1": "Yes", "name": "", "col2": 1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "B0e28213", "url": "https://www.am"}
1: {"col1": "Yes", "name": "", "col2": 1, "col3": "76,67 €", "col4": "5,75 €", "productNumber": "019883", "url": "https://www.am"}
brd_json:
array:
0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "11873628", "rating": "4.1", "url": "https://www.amazon"}
1: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "001838", "rating": "4.1", "url": "https://www.amazon"}
vs_json:
array:
0: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "1212", "rating": "4.1", "url": "https://www.amazon"}
1: 0: {"col1": "Yes", "col2": "https://m.media-a", "col3": null, "col4": "Yes", , "col5": "No", "col6": false, "productNumber": "2321", "rating": "4.1", "url": "https://www.amazon"}
我可以在编写脚本方面获得一些帮助吗?以下是我尝试过的:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
result_urls = results.select("url").withColumn("source", lit("result"))
brsd_json_url = brsd_json.select("url").withColumn("source", lit("brand"))
vis_json_url = brsd_json.select("url").withColumn("source", lit("video"))
combined_urls = result_urls.union(brsd_json_url).union(vis_json_url)
esearch_result = combined_urls.groupBy("source").agg({"url": "concat_ws"}).select("concat_ws(url) as prod_page_url_address")
我尝试了下面的方法,它能够帮助将所有列提取为单独的 pyspark 数据帧,但情况是如何考虑所有值将它们组合成一个数据帧
column_names = ["url", "productNumber", "col2", "col3", "col4", "col5",
"col6", "col7"]
# Define a function to create the expressions
def create_expr(column_name, json_column):
try:
expr = esearch_request_query.selectExpr(f"EXPLODE({json_column}.{column_name}) as {column_name}")
except:
expr = None
return expr
# Iterate over the column names
for column_name in column_names:
results_expr = create_expr(column_name, "result_json.re")
brsd_json_url = create_expr(column_name, "brsd_json")
vis_json_url = create_expr(column_name, "vis_json")
globals()[f"results_{column_name}"] = results_expr
globals()[f"brsd_json_{column_name}"] = brsd_json_url
globals()[f"vis_json_{column_name}"] = vis_json_url
# case where "col2" does not exist in sponsored_video_json
try:
results_brandName = esearch_request_query.selectExpr("EXPLODE(result_json.re.col2) as col2_name")
except:
results_brandName = None
try:
sponnd_col2 = esearch_request_query.selectExpr("EXPLODE(brsd_json.spon) as col2_name")
except:
spond_col2 = None
spovideo_col2 = None # Handle this case separately, as it does not exist in vis_json
正如您所提到的,您想要提取列数据并将其存储在另一个数据帧(e_result)中。我已经定义了架构并尝试了以下方法:
es_query = spark.createDataFrame([data], schema=schema)
r_json_df = es_query.select(explode("r_json").alias("r_json")).select(
col("r_json.productNumber").alias("productNumber"),
col("r_json.url").alias("url")
)
brd_json_df = es_query.select(explode("brd_json").alias("brd_json")).select(
col("brd_json.productNumber").alias("productNumber"),
col("brd_json.url").alias("url")
)
vs_json_df = es_query.select(explode("vs_json").alias("vs_json")).select(
col("vs_json.productNumber").alias("productNumber"),
col("vs_json.url").alias("url")
)
e_result = r_json_df.union(brd_json_df).union(vs_json_df)
e_result.show(truncate=False)
结果:
+-------------+------------------+
|productNumber|url |
+-------------+------------------+
|B0e28213 |https://www.am |
|019883 |https://www.am |
|11873628 |https://www.amazon|
|001838 |https://www.amazon|
|1212 |https://www.amazon|
|2321 |https://www.amazon|
+-------------+------------------+