使用 pyspark 数据框,嵌套数组的爆炸和存储在单独的行中花费了太多时间

问题描述 投票:0回答:1

我使用下面的代码片段来分解由嵌套 json 创建的数据帧中的列。查询花费太多时间才能完成。我可以获得一些关于如何提高性能的建议吗?我尝试增加簇大小,但这也没有帮助,因为代码 52 是我面临的一个重大问题

这是我正在使用的代码:

`from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, when
from pyspark.sql.types import StructType, StructField, StringType


spark = SparkSession.builder.appName("YourAppName").config("spark.executor.memory", "4g").getOrCreate()

search_query = search_query.repartition(100)  # Adjust the number according to your data size.

search_query = search_query.limit(1000)  # Adjust the limit according to your needs

column_names = [
    "col1",
    "col2",
    "col3",
    "col4",
    "col5",
    "col6",
    "col7",
    "col8",
    "col9",
    "col10",
    "col11",
    "col12",
  
]


initial_schema = StructType([StructField(name, StringType(), True) for name in column_names])

for col_name in column_names:
    search_query = search_query.withColumn(
        col_name,
        when(col(f"prejson.results.{col_name}").isNotNull(), col(f"prejson.results.{col_name}"))
        .when(col(f"prejson.sBrand.{col_name}").isNotNull(), col(f"prejson.sBrand.{col_name}"))
        .when(col(f"prejson.sVideo.{col_name}").isNotNull(), col(f"prejson.sVideo.{col_name}"))
        .otherwise(None)
    )

for col_name in column_names:
    search_query = search_query .withColumn(col_name, explode(col(col_name)))

results= search_query .select(column_names)
print(results.show())`
python pyspark azure-databricks
1个回答
0
投票

如果没有

prejson
数据样本或有关您正在使用的数据的上下文,很难回答这个问题,但我知道问题可能是什么,需要更多的空间来解释而不是适合评论的内容。

我认为导致内存溢出错误的代码是:

for col_name in column_names:
    search_query = search_query .withColumn(col_name, explode(col(col_name)))

举这个例子:
首先,我像问题中所描述的那样爆炸。它导致数据帧呈指数增长;因为,您正在为所有可能的排列列表元素创建一行。

然后我就爆炸了,就像列表是拉链一样;这意味着列表中的每个元素都对应于其他列表中的一个元素 - 具有相同索引的元素。

通常当分解多列时,第二种方法是目标,而不是第一种。

# exploding like it's described in the question
initial_df = (
    spark.createDataFrame([
        ([1, 2, 3], ["a", "b", "c"])
    ], ['c1', 'c2'])
)
print("input dataframe:")
initial_df.show()

column_names = initial_df.columns
for col_name in column_names:
    initial_df = initial_df.withColumn(col_name, f.explode(col_name))
initial_df.show()
print(f"final record count: {initial_df.count()}")

# exploding like each element of an array is corresponding to one element of the other array (like a zip)
initial_df = (
    spark.createDataFrame([
        ([1, 2, 3], ["a", "b", "c"])
    ], ['c1', 'c2'])
)
initial_df = (
    initial_df
    .withColumn("zipped", f.explode(f.arrays_zip("c1", "c2")))
    .select("zipped.*")
)
initial_df.show()
print(f"final record count: {initial_df.count()}")

输出:

input dataframe:
+---------+---------+                                                           
|       c1|       c2|
+---------+---------+
|[1, 2, 3]|[a, b, c]|
+---------+---------+

+---+---+
| c1| c2|
+---+---+
|  1|  a|
|  1|  b|
|  1|  c|
|  2|  a|
|  2|  b|
|  2|  c|
|  3|  a|
|  3|  b|
|  3|  c|
+---+---+
final record count: 9
                                                           
+---+---+
| c1| c2|
+---+---+
|  1|  a|
|  2|  b|
|  3|  c|
+---+---+
final record count: 3
© www.soinside.com 2019 - 2024. All rights reserved.