我使用下面的代码片段来分解由嵌套 json 创建的数据帧中的列。查询花费太多时间才能完成。我可以获得一些关于如何提高性能的建议吗?我尝试增加簇大小,但这也没有帮助,因为代码 52 是我面临的一个重大问题
这是我正在使用的代码:
`from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, when
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder.appName("YourAppName").config("spark.executor.memory", "4g").getOrCreate()
search_query = search_query.repartition(100) # Adjust the number according to your data size.
search_query = search_query.limit(1000) # Adjust the limit according to your needs
column_names = [
"col1",
"col2",
"col3",
"col4",
"col5",
"col6",
"col7",
"col8",
"col9",
"col10",
"col11",
"col12",
]
initial_schema = StructType([StructField(name, StringType(), True) for name in column_names])
for col_name in column_names:
search_query = search_query.withColumn(
col_name,
when(col(f"prejson.results.{col_name}").isNotNull(), col(f"prejson.results.{col_name}"))
.when(col(f"prejson.sBrand.{col_name}").isNotNull(), col(f"prejson.sBrand.{col_name}"))
.when(col(f"prejson.sVideo.{col_name}").isNotNull(), col(f"prejson.sVideo.{col_name}"))
.otherwise(None)
)
for col_name in column_names:
search_query = search_query .withColumn(col_name, explode(col(col_name)))
results= search_query .select(column_names)
print(results.show())`
如果没有
prejson
数据样本或有关您正在使用的数据的上下文,很难回答这个问题,但我知道问题可能是什么,需要更多的空间来解释而不是适合评论的内容。
我认为导致内存溢出错误的代码是:
for col_name in column_names:
search_query = search_query .withColumn(col_name, explode(col(col_name)))
举这个例子:
首先,我像问题中所描述的那样爆炸。它导致数据帧呈指数增长;因为,您正在为所有可能的排列列表元素创建一行。
然后我就爆炸了,就像列表是拉链一样;这意味着列表中的每个元素都对应于其他列表中的一个元素 - 具有相同索引的元素。
通常当分解多列时,第二种方法是目标,而不是第一种。
# exploding like it's described in the question
initial_df = (
spark.createDataFrame([
([1, 2, 3], ["a", "b", "c"])
], ['c1', 'c2'])
)
print("input dataframe:")
initial_df.show()
column_names = initial_df.columns
for col_name in column_names:
initial_df = initial_df.withColumn(col_name, f.explode(col_name))
initial_df.show()
print(f"final record count: {initial_df.count()}")
# exploding like each element of an array is corresponding to one element of the other array (like a zip)
initial_df = (
spark.createDataFrame([
([1, 2, 3], ["a", "b", "c"])
], ['c1', 'c2'])
)
initial_df = (
initial_df
.withColumn("zipped", f.explode(f.arrays_zip("c1", "c2")))
.select("zipped.*")
)
initial_df.show()
print(f"final record count: {initial_df.count()}")
输出:
input dataframe:
+---------+---------+
| c1| c2|
+---------+---------+
|[1, 2, 3]|[a, b, c]|
+---------+---------+
+---+---+
| c1| c2|
+---+---+
| 1| a|
| 1| b|
| 1| c|
| 2| a|
| 2| b|
| 2| c|
| 3| a|
| 3| b|
| 3| c|
+---+---+
final record count: 9
+---+---+
| c1| c2|
+---+---+
| 1| a|
| 2| b|
| 3| c|
+---+---+
final record count: 3