[在最近的SO-post中,我发现当结合不同的Windows规范处理堆叠/链列表达式时,使用withColumn
可能会改善DAG。但是,在此示例中,withColumn
实际上会使DAG变得更糟,并且与使用select
的结果有所不同。
首先,一些测试数据(PySpark 2.4.4独立版本:]
import pandas as pd import numpy as np from pyspark.sql import SparkSession, Window from pyspark.sql import functions as F spark = SparkSession.builder.getOrCreate() dfp = pd.DataFrame( { "col1": np.random.randint(0, 5, size=100), "col2": np.random.randint(0, 5, size=100), "col3": np.random.randint(0, 5, size=100), "col4": np.random.randint(0, 5, size=100), "col5": np.random.randint(0, 5, size=100), } ) df = spark.createDataFrame(dfp) df.show(5) +----+----+----+----+----+ |col1|col2|col3|col4|col5| +----+----+----+----+----+ | 0| 3| 2| 2| 2| | 1| 3| 3| 2| 4| | 0| 0| 3| 3| 2| | 3| 0| 1| 4| 4| | 4| 0| 3| 3| 3| +----+----+----+----+----+ only showing top 5 rows
示例很简单。包含2个窗口规范和基于它们的4个独立列表达式:
w1 = Window.partitionBy("col1").orderBy("col2") w2 = Window.partitionBy("col3").orderBy("col4") col_w1_1 = F.max("col5").over(w1).alias("col_w1_1") col_w1_2 = F.sum("col5").over(w1).alias("col_w1_2") col_w2_1 = F.max("col5").over(w2).alias("col_w2_1") col_w2_2 = F.sum("col5").over(w2).alias("col_w2_2") expr = [col_w1_1, col_w1_2, col_w2_1, col_w2_2]
withColumn-4个随机播放
如果withColumn
与交替的窗口规格一起使用,则DAG会创建不必要的混洗:
df.withColumn("col_w1_1", col_w1_1)\ .withColumn("col_w2_1", col_w2_1)\ .withColumn("col_w1_2", col_w1_2)\ .withColumn("col_w2_2", col_w2_2)\ .explain() == Physical Plan == Window [sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#147L], [col3#90L], [col4#91L ASC NULLS FIRST] +- *(4) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(col3#90L, 200) +- Window [sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#143L], [col1#88L], [col2#89L ASC NULLS FIRST] +- *(3) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(col1#88L, 200) +- Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#145L], [col3#90L], [col4#91L ASC NULLS FIRST] +- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(col3#90L, 200) +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#141L], [col1#88L], [col2#89L ASC NULLS FIRST] +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(col1#88L, 200) +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
选择-2个随机播放
如果所有列都用select
传递,则DAG是正确的。
df.select("*", *expr).explain() == Physical Plan == Window [max(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_1#119L, sum(col5#92L) windowspecdefinition(col3#90L, col4#91L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w2_2#121L], [col3#90L], [col4#91L ASC NULLS FIRST] +- *(2) Sort [col3#90L ASC NULLS FIRST, col4#91L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(col3#90L, 200) +- Window [max(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_1#115L, sum(col5#92L) windowspecdefinition(col1#88L, col2#89L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS col_w1_2#117L], [col1#88L], [col2#89L ASC NULLS FIRST] +- *(1) Sort [col1#88L ASC NULLS FIRST, col2#89L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(col1#88L, 200) +- Scan ExistingRDD[col1#88L,col2#89L,col3#90L,col4#91L,col5#92L]
问题
[存在一些有关为什么应该避免使用withColumn
的信息,但是它们主要涉及许多次调用withColumn
的问题,它们没有解决偏离DAG的问题(请参阅here和here) 。有谁知道为什么DAG在withColumn
和select
之间有所不同? Spark的优化算法在任何情况下均应适用,并且不应依赖于表达完全相同的事物的不同方法。
提前感谢。
上下文在最近的SO-post中,我发现在结合不同的Windows规范处理堆叠/链列表达式时,使用withColumn可以改善DAG。但是,在...
何时使用嵌套的withColumns和窗口函数?