使用相同数据帧名称时的缓存位置

问题描述 投票:0回答:1

我有一个名为 source_dataframe 的数据帧,它被 pyspark 代码中的多个位置引用。因此我计划缓存数据帧source_dataframe,以便使用缓存的引用而不是多次重新读取相同的数据。

source_dataframe = spark.read.format("delta").table("schema.table_name")
source_dataframe = source_dataframe.filter(condition).select(list_of_columns_needed)

# Planning to use cache here:source_dataframe.cache() so that cached data can be used by multiple references below.

# usage-1 of source_dataframe
columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
    source_dataframe = source_dataframe.withColumn(c, 
        when(trim(col(c)) == "", None)
        .otherwise(concat(lit(c), lit("_"), trim(col(c))))
    )

# usage-2 and other reference of source_dataframe
...

在 for 循环中,我重命名 columns_to_be_renamed 中提到的列的值,我需要保留相同的名称 source_dataframe。如果我使用如下所示的方法将值分配给 new_dataframe,则只有最后一列值会更新,因为早期数据会被覆盖。

columns_to_be_renamed = ["col_1","col_2","col_3"]
for c in columns_to_be_renamed:
    new_dataframe = source_dataframe.withColumn(c, 
        when(trim(col(c)) == "", None)
        .otherwise(concat(lit(c), lit("_"), trim(col(c))))
    )

鉴于此,我应该在读取 DataFrame source_dataframe 后立即缓存它还是应该在 for 循环之后缓存它?令人担忧的是,由于读取后和 for 循环中的 DataFrame 名称相同,因此读取后立即缓存可能会导致后续引用 source_dataframe

after the loop
引用 for 循环中的未缓存版本而不是缓存版本。

apache-spark pyspark
1个回答
0
投票

读取后立即缓存可能会导致后续引用 循环后的 source_dataframe 引用未缓存版本 来自 for 循环而不是缓存的循环。

否,因为您通过引用传递数据帧,所以基本上它是相同的数据帧,因此您可以将

.cache()
放在开头,您始终可以使用数据帧上的
.explain()
来验证数据帧的状态,这将显示执行计划,所以在你的情况下:

source_dataframe = spark.read.format("delta").table("schema.table_name").cache()
    columns_to_be_renamed = ["col_1", "col_2", "col_3"]
    for c in columns_to_be_renamed:
        source_dataframe = source_dataframe.withColumn(c,
                                                       when(trim(col(c)) == "", None)
                                                       .otherwise(concat(lit(c), lit("_"), trim(col(c))))
                                                       )
    columns_to_be_renamed = ["col_1", "col_2", "col_3"]
    new_dataframe = None
    for c in columns_to_be_renamed:
        new_dataframe = source_dataframe.withColumn(c,
                                                    when(trim(col(c)) == "", None)
                                                    .otherwise(concat(lit(c), lit("_"), trim(col(c))))
                                                    )
    new_dataframe.explain()

将显示此计划:

== Physical Plan ==
*(1) Project [CASE WHEN (trim(col_1#17, None) = ) THEN null ELSE concat(col_1, _, trim(col_1#17, None)) END AS col_1#38, CASE WHEN (trim(col_2#18, None) = ) THEN null ELSE concat(col_2, _, trim(col_2#18, None)) END AS col_2#43, CASE WHEN (trim(CASE WHEN (trim(col_3#19, None) = ) THEN null ELSE concat(col_3, _, trim(col_3#19, None)) END, None) = ) THEN null ELSE concat(col_3, _, trim(CASE WHEN (trim(col_3#19, None) = ) THEN null ELSE concat(col_3, _, trim(col_3#19, None)) END, None)) END AS col_3#59]
+- InMemoryTableScan [col_1#17, col_2#18, col_3#19]
      +- InMemoryRelation [col_1#17, col_2#18, col_3#19], StorageLevel(disk, memory, deserialized, 1 replicas)
            +- FileScan csv [col_1#17,col_2#18,col_3#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/../resso..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<col_1:string,col_2:string,col_3:string>

可以看到,调用了操作符InMemoryTableScan,也就是说调用了缓存

© www.soinside.com 2019 - 2024. All rights reserved.