我正在尝试了解
filter pushdown
和 partition pruning
等 Spark 优化技术如何在实践中发挥作用。考虑以下代码片段:
df = spark.read.csv('path', header= True, inferSchema=True)
df.filter(col('salary') > 10000)
我的理解是过滤操作下推到数据源。这就是我到处阅读的原因,在
physical plan
中也可以看到同样的情况。
+- FileScan parquet [Salary#457,country_code#461] Batched: true, DataFilters: [isnotnull(country_code#461), (country_code#461 = IND)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/tables/bigData/withoutPartitionParquet], PartitionFilters: [], PushedFilters: [IsNotNull(country_code), EqualTo(country_code,IND)], ReadSchema: struct<Salary:int,country_code:string>
但我对这是如何发生的感到困惑。当我们最初使用
spark.read.csv
加载数据时,Spark在应用过滤器之前不是已经将数据加载到内存中了吗?如果它已经加载,那么过滤器如何应用于源?
我知道
catalyst optimizer
在执行之前优化查询,但是如果我先执行 read 部分,这表明已经运行了 2 个作业(一个用于加载数据,另一个用于 inferSchema)以将数据加载到内存中,然后过滤器是如何被推下的?
有人可以澄清 Spark 如何处理这个问题吗?我觉得我错过了 Spark 如何读取数据并应用这些过滤器的一些内容。
预先感谢您的任何见解!
Spark的读取函数生成物理计划,并将过滤器下推到优化逻辑计划中。您可能知道,Spark 为任何给定作业生成的计划都有以下阶段:
Parsed Logical Plan
-> Analyzed Logical Plan
-> Optimized Logical Plan
-> Physical Logical Plan
“下推”发生在优化阶段,由 Catalyst 优化器完成。某些优化(例如谓词下推或分区修剪)的有效性取决于数据源的功能。如果数据源不支持这些功能,Spark 无法将过滤器下推到数据源级别。 考虑以下场景:
案例 1:从 CSV 文件读取数据(INFERSCHEMA 已启用)考虑将下表创建为数据框
df
并注意工资列的类型是
STRING
而不是 INTEGER
名称 [STRING] | 国家代码 [STRING] | 工资 [STRING] | |
---|---|---|---|
鲍勃 | IND | 10000 | |
母鹿 | IND | 20000 | |
约翰 | 澳大利亚 | 15000 | |
玛格丽特 | 澳大利亚 | 11000 | |
巴托 | 美国 | 10000 | |
利索 | 美国 | 12000 | |
磁笛 | 美国 | 14000 | |
佛罗多 | 英国 | 10400 | |
萨明顿 | 英国 | 10200 | |
格鲁吉亚 | 新西兰 | 23000 |
country_code=IND
和
salary>10000
将如下所示:df.filter((df["salary"] > 10000) & (df["country_code"] == "IND")).show()
对应的Physical Plan如下:
== Physical Plan ==
*(1) Filter (isnotnull(country_code#19) AND (country_code#19 = IND))
+- FileScan csv [id#17,name#18,country_code#19,salary#20] Batched: false, .. PushedFilters: [IsNotNull(country_code), EqualTo(country_code,IND)]...
此处,物理计划
PushedFilter
优化仅部分应用于
country_code
列,因为它在物理计划中提到了 PushedFilters: [IsNotNull(country_code), EqualTo(country_code,IND)]
。案例 2:显式指定架构一旦我使用
spark.read.csv
读取相同的数据帧并启用下面定义的
header=True
和 schema=<CustomSchema>
:schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("country_code", StringType(), True),
StructField("salary", IntegerType(), True)
])
现在,为正在过滤的两列启用相同的过滤条件,更改
和下推过滤器。 Spark 可以理解具有给定架构的
csv
数据源,它现在可以将过滤器下推直到源读取扫描步骤,并避免将不必要的数据量拉入内存。
df.filter((df["salary"] > 10000) & (df["country_code"] == "IND")).show()
*(1) Filter (((isnotnull(salary#97) AND isnotnull(country_code#96)) AND (salary#97 >= 10000)) AND (country_code#96 = IND))
+- FileScan csv [id#94,name#95,country_code#96,salary#97] Batched: false,
PushedFilters: [IsNotNull(salary), IsNotNull(country_code), GreaterThanOrEqual(salary,10000), EqualTo(country_code, IND)..
现在,
PushedFilter
(
PushedFilters: [IsNotNull(salary), IsNotNull(country_code), GreaterThanOrEqual(salary,10000), EqualTo(country_code, IND)
) 应用于两个列(因为 Spark 不必将列 salary
从推断的 StringType
转换为 IntegerType
,因为我们明确提到了模式)。