跨越分区的SparkSQL DataFrame顺序

问题描述 投票:5回答:2

我正在使用spark sql对我的数据集运行查询。查询的结果非常小但仍然是分区的。

我想合并生成的DataFrame并按列排序行。我试过了

DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

我也试过了

DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

输出文件以块的形式排序(即分区是有序的,但数据帧不作为整体排序)。例如,而不是

1, value
2, value
4, value
4, value
5, value
5, value
...

我明白了

2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
  1. 获取查询结果的绝对排序的正确方法是什么?
  2. 为什么数据框不会合并为单个分区?
apache-spark apache-spark-sql spark-dataframe
2个回答
3
投票

我想在这里提几件事。 1-源代码显示orderBy语句在内部调用排序api,全局排序设置为true。因此,输出级别缺乏排序表明在写入目标时排序丢失。我的观点是,对orderBy的调用始终需要全局订单。

2-使用剧烈的聚结,如在你的情况下强制单个分区,可能是非常危险的。我建议你不要这样做。源代码表明调用coalesce(1)可能会导致上游转换使用单个分区。这将是残酷的表现。

3-您似乎希望orderBy语句可以使用单个分区执行。我不认为我同意这一说法。这将使Spark成为一个非常愚蠢的分布式框架。

如果您同意或不同意声明,请告知我们。

你是如何从输出中收集数据的呢?

也许输出实际上包含已排序的数据,但您为了从输出中读取而执行的转换/操作是导致订单丢失的原因。


1
投票

orderBy将在合并后生成新分区。要拥有单个输出分区,请重新排序操作...

DataFrame result = spark.sql("my sql").orderBy("col1").coalesce(1)
result.write.json("results.json")

正如@JavaPlanet所提到的,对于非常大的数据,您不希望合并到单个分区中。它将大大降低您的并行度。

© www.soinside.com 2019 - 2024. All rights reserved.