使用.select()
时遇到了令人惊讶的行为:
>>> my_df.show()
+---+---+---+
| a| b| c|
+---+---+---+
| 1| 3| 5|
| 2| 4| 6|
+---+---+---+
>>> a_c = s_df.select(col("a"), col("c")) # removing column b
>>> a_c.show()
+---+---+
| a| c|
+---+---+
| 1| 5|
| 2| 6|
+---+---+
>>> a_c.filter(col("b") == 3).show() # I can still filter on "b"!
+---+---+
| a| c|
+---+---+
| 1| 5|
+---+---+
这种行为让我感到疑惑......以下几点是否正确?
DataFrame只是视图,简单的DataFrame就是它自己的视图。在我看来,a_c
只是对my_df
的看法。
当我创建a_c
时没有创建新数据,a_c
只是指向my_df
指向的相同数据。
如果有其他相关信息,请添加!
这是因为Spark的懒惰性质。足够“智能”将滤波器向下推,使其在滤波器*之前发生在较低的水平。所以,因为这一切都发生在相同的stage执行中,并且仍然可以解决。事实上你可以在explain
看到这个:
== Physical Plan ==
*Project [a#0, c#2]
+- *Filter (b#1 = 3) <---Filter before Project
+- LocalTableScan [A#0, B#1, C#2]
您可以强制进行随机播放和新阶段,然后查看您的过滤器失败。甚至在编译时捕获它。这是一个例子:
a_c.groupBy("a","c").count.filter(col("b") === 3)
*还有一个投影修剪,如果它意识到它在任何时候都不需要列,则将选择向下推送到数据库层。但是我相信过滤器会导致它“需要”而不是修剪......但我没有测试过。
让我们从一些关于基础的基础知识开始。这将使您的理解变得容易。 RDD:Spark核心的基础是称为RDD的数据结构,它被懒惰地评估。通过延迟评估,我们的意思是RDD计算在动作时发生(比如在RDD中调用计数或在数据集中显示)。
数据集或数据帧(数据集[行])也在核心使用RDD。
这意味着只有在触发动作(显示)时才会实现每个转换(如过滤器)。
所以你的问题“当我创建a_c时没有创建新数据,a_c只是指向my_df所指向的相同数据。”
因为没有实现的数据。我们必须意识到它将它带入记忆中。您的过滤器适用于初始数据帧。使a_c.filter(col("b") == 3).show()
抛出运行时异常的唯一方法是使用dataframe.cache缓存中间数据帧。所以spark会抛出“main”org.apache.spark.sql.AnalysisException:无法解析列名Eg。
val a_c = s_df.select(col("a"), col("c")).cache
a_c.filter(col("b") == 3).show()
所以spark会抛出“main”org.apache.spark.sql.AnalysisException:无法解析列名。