Pyspark-合并多个ORC模式

问题描述 投票:0回答:1

我有2个不同的目录,下面有一个ORC文件。这两个文件具有不同的架构。将两个目录读入同一DataFrame时,最终模式取决于路径的顺序。

请考虑以下代码来复制它:

data = [
    (1, "player1", "google.com", True),
    (2, "player1", "youtube.com", True),
    (3, "player2", "facebook.com", True),
    (4, "player2", "record.pt", True),
    (5, "player2", "yahoo.com", True),
    (6, "player3", "facebook.com", False),
    (7, "player3", "record.pt", True),
    (8, "player3", "yahoo.com", True),
    (9, "player4", "", True),
    (10, "player4", "record.pt", True),
    (11, "player4", "abola.pt", True),
    (12, "player4", None, True)
]

data2 = [
    (13, "player1", True),
    (14, "player2", True),
    (15, "player3", True),
    (16, "player4", True),
    (17, "player3", True),
    (18, "player3", True),
]

spark = SparkSession.builder.getOrCreate()

df1 = spark.createDataFrame(data, ["id", "splayer", "website", "bool"])
df2 = spark.createDataFrame(data2, ["id", "splayer", "bool"])

df1.coalesce(1).write.orc('temporary/bla=1', mode='overwrite')
df2.coalesce(1).write.orc('temporary/bla=2', mode='overwrite')

df = spark.read.option("mergeSchema", "true").option("basePath", "temporary").orc(['temporary/bla=2', 'temporary/bla=1'])

df.show()

这产生输出:

+---+-------+-----+---+                                                         
| id|splayer| bool|bla|
+---+-------+-----+---+
|  1|player1| true|  1|
|  2|player1| true|  1|
|  3|player2| true|  1|
|  4|player2| true|  1|
|  5|player2| true|  1|
|  6|player3|false|  1|
|  7|player3| true|  1|
|  8|player3| true|  1|
|  9|player4| true|  1|
| 10|player4| true|  1|
| 11|player4| true|  1|
| 12|player4| true|  1|
| 13|player1| true|  2|
| 14|player2| true|  2|
| 15|player3| true|  2|
| 16|player4| true|  2|
| 17|player3| true|  2|
| 18|player3| true|  2|
+---+-------+-----+---+

如果更改目录的顺序,将生成以下输出:

+---+-------+------------+-----+---+                                            
| id|splayer|     website| bool|bla|
+---+-------+------------+-----+---+
|  1|player1|  google.com| true|  1|
|  2|player1| youtube.com| true|  1|
|  3|player2|facebook.com| true|  1|
|  4|player2|   record.pt| true|  1|
|  5|player2|   yahoo.com| true|  1|
|  6|player3|facebook.com|false|  1|
|  7|player3|   record.pt| true|  1|
|  8|player3|   yahoo.com| true|  1|
|  9|player4|            | true|  1|
| 10|player4|   record.pt| true|  1|
| 11|player4|    abola.pt| true|  1|
| 12|player4|        null| true|  1|
| 13|player1|        null| true|  2|
| 14|player2|        null| true|  2|
| 15|player3|        null| true|  2|
| 16|player4|        null| true|  2|
| 17|player3|        null| true|  2|
| 18|player3|        null| true|  2|
+---+-------+------------+-----+---+

[研究此问题时,我发现了几篇文章指出option("mergeSchema", "true")将是一个解决方案。实际上,有一个pull request

是否有解决方案,还是尚待解决的问题?

我正在使用(Py)Spark 2.4.3和Python 3.6.8。

谢谢您!

UPDATE

上述PR仅适用于Spark 3.0.0。感谢您提供@Shaido的信息。

python apache-spark pyspark pyspark-sql orc
1个回答
0
投票

由于从某些供应商数据进行架构演变,因此我遇到了同样的问题。我一直在尝试一些不同的想法,因为在Spark 3.0之前ORC mergeSchema选项不可用,而我们正在运行2.3我的第一个想法是用完整的架构(包括所有新列)创建一个空的数据框,并将其作为ORC文件保存到按字母顺序顺序排列的目录中。例如,如果我的数据按load_date进行分区,那么我将拥有诸如load_date = 00000000,load_date = 20200501,load_date = 20200601等文件夹。这行得通,但还不是很干净,我也不相信ORC读取器不会以某种方式选择其他ORC文件作为架构的基础。因此,我想到了只向ORC阅读器提供一个包含我需要的所有列的架构,并且可以正常工作。

schema = StructType([StructField('state', StringType(), True), StructField('new_col_middle', StringType(), True), StructField('abbr', StringType(), False), StructField('population', IntegerType(), False), StructField('new_col2', StringType(), False)])
df = spark.read.schema(schema).orc('/data/sandbox/orc_schema_evolution/')

在HDFS的orc_schema_evolution文件夹中,我们具有分区的load_date文件夹,其中一些ORC文件具有架构(“状态”,“人口”),而其他文件具有架构(“状态”,“人口”,“ abbr”)。注意,我什至可以使用这种方法重新排列现有列的顺序。

© www.soinside.com 2019 - 2024. All rights reserved.