我有一个名为 source_dataframe 的数据帧,它被 pyspark 代码中的多个位置引用。因此我计划缓存数据帧source_dataframe,以便使用缓存的引用而不是多次重新读取相同的数据。
source_dataframe = spark.read.format("delta").table("schema.table_name")
source_dataframe = source_dataframe.filter(condition).select(list_of_columns_needed)
# Planning to use cache here:source_dataframe.cache() so that cached data can be used by multiple references below.
# usage-1 of source_dataframe
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
source_dataframe = source_dataframe.withColumn(c,
when(trim(col(c)) == "", None)
.otherwise(concat(lit(c), lit("_"), trim(col(c))))
)
# usage-2 and other reference of source_dataframe
...
在 for 循环中,我重命名 columns_to_be_renamed 中提到的列的值,我需要保留相同的名称 source_dataframe。如果我使用如下所示的方法将值分配给 new_dataframe,则只有最后一列值会更新,因为早期数据会被覆盖。
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
new_dataframe = source_dataframe.withColumn(c,
when(trim(col(c)) == "", None)
.otherwise(concat(lit(c), lit("_"), trim(col(c))))
)
鉴于此,我应该在读取 DataFrame source_dataframe 后立即缓存它还是应该在 for 循环之后缓存它?令人担忧的是,由于读取后和 for 循环中的 DataFrame 名称相同,因此读取后立即缓存可能会导致后续引用 source_dataframe
after the loop
引用 for 循环中的未缓存版本而不是缓存版本。
读取后立即缓存可能会导致后续引用 循环后的 source_dataframe 引用未缓存版本 来自 for 循环而不是缓存的循环。
否,因为您通过引用传递数据帧,所以基本上它是相同的数据帧,因此您可以将
.cache()
放在开头,您始终可以使用数据帧上的 .explain()
来验证数据帧的状态,这将显示执行计划,所以在你的情况下:
source_dataframe = spark.read.format("delta").table("schema.table_name").cache()
columns_to_be_renamed = ["col_1", "col_2", "col_3"]
for c in columns_to_be_renamed:
source_dataframe = source_dataframe.withColumn(c,
when(trim(col(c)) == "", None)
.otherwise(concat(lit(c), lit("_"), trim(col(c))))
)
columns_to_be_renamed = ["col_1", "col_2", "col_3"]
new_dataframe = None
for c in columns_to_be_renamed:
new_dataframe = source_dataframe.withColumn(c,
when(trim(col(c)) == "", None)
.otherwise(concat(lit(c), lit("_"), trim(col(c))))
)
new_dataframe.explain()
将显示此计划:
== Physical Plan ==
*(1) Project [CASE WHEN (trim(col_1#17, None) = ) THEN null ELSE concat(col_1, _, trim(col_1#17, None)) END AS col_1#38, CASE WHEN (trim(col_2#18, None) = ) THEN null ELSE concat(col_2, _, trim(col_2#18, None)) END AS col_2#43, CASE WHEN (trim(CASE WHEN (trim(col_3#19, None) = ) THEN null ELSE concat(col_3, _, trim(col_3#19, None)) END, None) = ) THEN null ELSE concat(col_3, _, trim(CASE WHEN (trim(col_3#19, None) = ) THEN null ELSE concat(col_3, _, trim(col_3#19, None)) END, None)) END AS col_3#59]
+- InMemoryTableScan [col_1#17, col_2#18, col_3#19]
+- InMemoryRelation [col_1#17, col_2#18, col_3#19], StorageLevel(disk, memory, deserialized, 1 replicas)
+- FileScan csv [col_1#17,col_2#18,col_3#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/../resso..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col_1:string,col_2:string,col_3:string>
可以看到,调用了操作符InMemoryTableScan,也就是说调用了缓存