我需要在持久化数据帧之前执行过滤器。为了强制执行此操作,我在filter + persist之后尝试了一个count()
。当数据帧读取(在Step 2
处)被保持(跳过任务)时,每次执行步骤4中的循环时执行相应的过滤器(filter #1
)。
假设那是错误的
filter #1
在坚持之前执行?read->filter->persist
的执行。我想强制过滤器在持久化之前执行一次,因为在Step 4
的每次迭代中遍历过滤器列表是一个非常昂贵的过程。
Step 1
val dataFrame2 = readData(sparkSession, paths)
.
.
.
Step 2
val dataFrame1 = readData(sparkSession, paths)
.filter(col("field1").isin(listOfRequiredVals: _*))//filter #1
.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
dataFrame1.count()
.
.
.
.
Step 3
val dataFrame4 = dataFrame2.filter(col("field2") === lit(52)) filter #2
.join(broadcast(dataFrame3), Seq("field3", "field4"))
.persist(StorageLevel.MEMORY_AND_DISK_SER_2)
Step 4
while( i > x) {
dataFrame4.join(dataFrame2, "fieldX").write.parquet("path")
i += 1
}
更新1:
Step 3
中持续存在,但来自filter #1
的Step 2
在持续存在之前未能执行。这里的问题是需要一个MVCE,但我相信我已经找到了问题所在。对unpersist()
的调用是在join
之后(标志着持久性数据帧的结束),但在一个谱系触发之前 - 在write
结果上的前join
。因此,尽管在持久化之后放置了一个计数操作(但是在write
之前通过取消它而无意识地使其失效,所以谱系涉及再次读取文件)。