强制转换在持续存在之前不起作用

问题描述 投票:-1回答:1

我需要在持久化数据帧之前执行过滤器。为了强制执行此操作,我在filter + persist之后尝试了一个count()。当数据帧读取(在Step 2处)被保持(跳过任务)时,每次执行步骤4中的循环时执行相应的过滤器(filter #1)。

假设那是错误的

  1. filter #1在坚持之前执行?
  2. count将触发read->filter->persist的执行。

我想强制过滤器在持久化之前执行一次,因为在Step 4的每次迭代中遍历过滤器列表是一个非常昂贵的过程。

Step 1
val dataFrame2 = readData(sparkSession, paths)
.
.
.
Step 2
val dataFrame1 = readData(sparkSession, paths)
.filter(col("field1").isin(listOfRequiredVals: _*))//filter #1
.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

dataFrame1.count()
.
.
.
.
Step 3
val dataFrame4  = dataFrame2.filter(col("field2") === lit(52)) filter #2
.join(broadcast(dataFrame3), Seq("field3", "field4"))
.persist(StorageLevel.MEMORY_AND_DISK_SER_2)

Step 4
while( i > x) {
     dataFrame4.join(dataFrame2, "fieldX").write.parquet("path")
     i += 1
}

更新1:

  1. 通过引用正确的过滤器#更新了解释。
  2. 虽然由于过滤器#2引起的变化在Step 3中持续存在,但来自filter #1Step 2在持续存在之前未能执行。
scala apache-spark
1个回答
0
投票

这里的问题是需要一个MVCE,但我相信我已经找到了问题所在。对unpersist()的调用是在join之后(标志着持久性数据帧的结束),但在一个谱系触发之前 - 在write结果上的前join。因此,尽管在持久化之后放置了一个计数操作(但是在write之前通过取消它而无意识地使其失效,所以谱系涉及再次读取文件)。

© www.soinside.com 2019 - 2024. All rights reserved.