了解 Spark 过滤器下推:它如何与数据加载交互?

问题描述 投票:0回答:1

我正在尝试了解

filter pushdown
partition pruning
等 Spark 优化技术如何在实践中发挥作用。考虑以下代码片段:

df = spark.read.csv('path', header= True, inferSchema=True)

df.filter(col('salary') > 10000)

我的理解是过滤操作下推到数据源。这就是我到处阅读的原因,在

physical plan
中也可以看到同样的情况。

+- FileScan parquet [Salary#457,country_code#461] Batched: true, DataFilters: [isnotnull(country_code#461), (country_code#461 = IND)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/tables/bigData/withoutPartitionParquet], PartitionFilters: [], PushedFilters: [IsNotNull(country_code), EqualTo(country_code,IND)], ReadSchema: struct<Salary:int,country_code:string>

但我对这是如何发生的感到困惑。当我们最初使用

spark.read.csv
加载数据时,Spark在应用过滤器之前不是已经将数据加载到内存中了吗?如果它已经加载,那么过滤器如何应用于源?

我知道

catalyst optimizer
在执行之前优化查询,但是如果我先执行 read 部分,这表明已经运行了 2 个作业(一个用于加载数据,另一个用于 inferSchema)以将数据加载到内存中,然后过滤器是如何被推下的?

有人可以澄清 Spark 如何处理这个问题吗?我觉得我错过了 Spark 如何读取数据并应用这些过滤器的一些内容。

预先感谢您的任何见解!

apache-spark pyspark apache-spark-sql
1个回答
0
投票

Spark的读取函数生成物理计划,并将过滤器下推到优化逻辑计划中。您可能知道,Spark 为任何给定作业生成的计划都有以下阶段:

Parsed Logical Plan
->
Analyzed Logical Plan
->
Optimized Logical Plan
->
Physical Logical Plan

“下推”发生在优化阶段,由 Catalyst 优化器完成。某些优化(例如谓词下推或分区修剪)的有效性取决于数据源的功能。如果数据源不支持这些功能,Spark 无法将过滤器下推到数据源级别。 考虑以下场景:

案例 1:从 CSV 文件读取数据(INFERSCHEMA 已启用)

考虑将下表创建为数据框

df

并注意工资列的类型是

STRING
而不是
INTEGER

id [STRING]0123456789
名称 [STRING] 国家代码 [STRING] 工资 [STRING]
鲍勃 IND 10000
母鹿 IND 20000
约翰 澳大利亚 15000
玛格丽特 澳大利亚 11000
巴托 美国 10000
利索 美国 12000
磁笛 美国 14000
佛罗多 英国 10400
萨明顿 英国 10200
格鲁吉亚 新西兰 23000
然后,应用以下过滤器
country_code=IND

salary>10000
将如下所示:
df.filter((df["salary"] > 10000) & (df["country_code"] == "IND")).show()

对应的Physical Plan如下:

== Physical Plan == *(1) Filter (isnotnull(country_code#19) AND (country_code#19 = IND)) +- FileScan csv [id#17,name#18,country_code#19,salary#20] Batched: false, .. PushedFilters: [IsNotNull(country_code), EqualTo(country_code,IND)]...

此处,物理计划 
PushedFilter

优化仅部分应用于

country_code
列,因为它在物理计划中提到了
PushedFilters: [IsNotNull(country_code), EqualTo(country_code,IND)]

案例 2:显式指定架构

一旦我使用

spark.read.csv

读取相同的数据帧并启用下面定义的

header=True
schema=<CustomSchema>
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("country_code", StringType(), True),
    StructField("salary", IntegerType(), True)
])

现在,为正在过滤的两列启用相同的过滤条件,
更改

下推过滤器 Spark 可以理解具有给定架构的

csv

数据源,它现在可以将过滤器下推直到源读取扫描步骤,并避免将不必要的数据量拉入内存。

df.filter((df["salary"] > 10000) & (df["country_code"] == "IND")).show()

*(1) Filter (((isnotnull(salary#97) AND isnotnull(country_code#96)) AND (salary#97 >= 10000)) AND (country_code#96 = IND))
+- FileScan csv [id#94,name#95,country_code#96,salary#97] Batched: false,
PushedFilters: [IsNotNull(salary), IsNotNull(country_code), GreaterThanOrEqual(salary,10000), EqualTo(country_code, IND)..
现在,
PushedFilter

(

PushedFilters: [IsNotNull(salary), IsNotNull(country_code), GreaterThanOrEqual(salary,10000), EqualTo(country_code, IND)
) 应用于两个列(因为 Spark 不必将列
salary
从推断的
StringType
转换为
IntegerType
,因为我们明确提到了模式)。
    

© www.soinside.com 2019 - 2024. All rights reserved.