在整个 ETL 期间使用相同变量时在 Spark 中持久化和取消持久化

问题描述 投票:0回答:1

如果我的 etl 过程的所有步骤都具有相同的变量名称,

persist()
unpersist()
将如何工作? 例如:

df =  new dataframe created by reading json for instance i dunno
df.persist()
df = df.withColumn(some_transformation)
df = df.another_transofrmation
df = df.probably_even_more_transformations
df.unpersist()
  1. 到底什么是持久化和不持久化?
  2. persist()
    unpersist()
    方法是动作还是转换?
  3. 每次
    spark.catalog.clearCache()
    unpersist()
    persist()
  4. 哪个更好

提前谢谢您

apache-spark pyspark persist
1个回答
0
投票

在 Spark 中,

persist
unpersist
用于在使用 DataFrame 时管理内存和磁盘使用情况。

您的场景:

+-------------------+    +-------------------+    +-------------------------------+
|                   |    |                   |    |                               |
| Read JSON to df   | -> | df.persist()      | -> | Transformations on df         |
|                   |    | (Stores df in     |    | (df keeps getting reassigned) |
|                   |    | memory/disk)      |    |                               |
+-------------------+    +-------------------+    +-------------------------------+
                                   |                                 |
                                   V                                 V
                          +-------------------+         +--------------------------------+
                          |                   |         |                                |
                          | Data is persisted |         | Reassignments do not affect    |
                          | during all these  |         | persisted data until unpersist |
                          | transformations   |         | is called                      |
                          |                   |         |                                |
                          +-------------------+         +--------------------------------+
                                                                      |
                                                                      V
                                                            +--------------------+
                                                            |                    |
                                                            | df.unpersist()     |
                                                            | (Frees up storage) |
                                                            |                    |
                                                            +--------------------+

所以:

  1. 什么是坚持和不坚持?

    DataFrame

    df
    在调用
    persist()
    方法后被持久化。此后应用于
    df
    的所有转换都不会影响持久数据,直到调用
    unpersist()
    为止。当您通过转换重新分配
    df
    时,原始持久数据帧将保留在内存/磁盘中,直到
    unpersist()

  2. persist()
    unpersist()
    方法是动作还是转换?

    persist()
    unpersist()
    不是典型 Spark 意义上的操作或转换。它们是管理存储的方法。
    persist()
    提示 Spark 将 DataFrame 保留在内存或磁盘中,
    unpersist()
    释放存储空间。它们不会像操作那样触发计算,也不会像转换那样创建新的数据帧。

  3. 每次

    spark.catalog.clearCache()
    之后
    unpersist()
    persist()
    哪个更好?

    • unpersist()
      :专门删除其所调用的 DataFrame 的持久性。当您知道不再需要哪个 DataFrame 时,它会更加受控和精确。
    • spark.catalog.clearCache()
      :清除Spark会话中所有缓存的DataFrame。它更像是全局重置,当您想要确保释放所有缓存数据时很有用,但它不太精确。

在 ETL 过程中,当您完成特定 DataFrame 来释放资源时,更典型的是使用

unpersist()
clearCache()
可以在流程结束时使用,或者当您想确保所有缓存数据都是干净的。

另请参阅“Spark – 缓存和持久之间的区别?”来自 Naveen Nelamali,2023 年 10 月

cache()
persist()
都是优化技术,用于存储RDD、DataFrame和Dataset的中间计算,以便它们可以在后续操作中重用。主要区别在于
cache()
默认将数据存储在内存中 (
MEMORY_ONLY
),而
persist()
允许指定存储级别,包括
MEMORY_AND_DISK
,这是 DataFrame 和 Dataset 的默认值。

  • persist()
    方法可以不带参数使用,默认为
    MEMORY_AND_DISK
    存储级别,或者使用
    StorageLevel
    参数来指定不同的存储级别,如
    MEMORY_ONLY
    MEMORY_AND_DISK_SER
    等……

  • unpersist()
    方法用于从内存或存储中删除持久化的DataFrame或Dataset。它有一个布尔参数,设置后会阻止操作,直到删除所有块。

Spark 自动监控

persist()
cache()
调用,并且可能会按照最近最少使用 (LRU) 算法丢弃未使用的持久数据。需要注意的是,Spark 中的缓存是一种惰性操作,这意味着在某个操作触发它之前,DataFrame 或 Dataset 不会被缓存。

在 ETL 过程的上下文中,始终使用相同的变量名称,

persist()
unpersist()
方法控制 DataFrame 在过程的不同阶段的存储和释放。
在转换期间,DataFrame 将保留在指定的存储级别(DataFrame 的默认存储级别为
MEMORY_AND_DISK
),直到显式调用
unpersist()

这样可以通过避免在每个转换阶段重新计算 DataFrame 来优化性能。

© www.soinside.com 2019 - 2024. All rights reserved.