我有2个不同的目录,下面有一个ORC文件。这两个文件具有不同的架构。将两个目录读入同一DataFrame时,最终模式取决于路径的顺序。
请考虑以下代码来复制它:
data = [
(1, "player1", "google.com", True),
(2, "player1", "youtube.com", True),
(3, "player2", "facebook.com", True),
(4, "player2", "record.pt", True),
(5, "player2", "yahoo.com", True),
(6, "player3", "facebook.com", False),
(7, "player3", "record.pt", True),
(8, "player3", "yahoo.com", True),
(9, "player4", "", True),
(10, "player4", "record.pt", True),
(11, "player4", "abola.pt", True),
(12, "player4", None, True)
]
data2 = [
(13, "player1", True),
(14, "player2", True),
(15, "player3", True),
(16, "player4", True),
(17, "player3", True),
(18, "player3", True),
]
spark = SparkSession.builder.getOrCreate()
df1 = spark.createDataFrame(data, ["id", "splayer", "website", "bool"])
df2 = spark.createDataFrame(data2, ["id", "splayer", "bool"])
df1.coalesce(1).write.orc('temporary/bla=1', mode='overwrite')
df2.coalesce(1).write.orc('temporary/bla=2', mode='overwrite')
df = spark.read.option("mergeSchema", "true").option("basePath", "temporary").orc(['temporary/bla=2', 'temporary/bla=1'])
df.show()
这产生输出:
+---+-------+-----+---+
| id|splayer| bool|bla|
+---+-------+-----+---+
| 1|player1| true| 1|
| 2|player1| true| 1|
| 3|player2| true| 1|
| 4|player2| true| 1|
| 5|player2| true| 1|
| 6|player3|false| 1|
| 7|player3| true| 1|
| 8|player3| true| 1|
| 9|player4| true| 1|
| 10|player4| true| 1|
| 11|player4| true| 1|
| 12|player4| true| 1|
| 13|player1| true| 2|
| 14|player2| true| 2|
| 15|player3| true| 2|
| 16|player4| true| 2|
| 17|player3| true| 2|
| 18|player3| true| 2|
+---+-------+-----+---+
如果更改目录的顺序,将生成以下输出:
+---+-------+------------+-----+---+
| id|splayer| website| bool|bla|
+---+-------+------------+-----+---+
| 1|player1| google.com| true| 1|
| 2|player1| youtube.com| true| 1|
| 3|player2|facebook.com| true| 1|
| 4|player2| record.pt| true| 1|
| 5|player2| yahoo.com| true| 1|
| 6|player3|facebook.com|false| 1|
| 7|player3| record.pt| true| 1|
| 8|player3| yahoo.com| true| 1|
| 9|player4| | true| 1|
| 10|player4| record.pt| true| 1|
| 11|player4| abola.pt| true| 1|
| 12|player4| null| true| 1|
| 13|player1| null| true| 2|
| 14|player2| null| true| 2|
| 15|player3| null| true| 2|
| 16|player4| null| true| 2|
| 17|player3| null| true| 2|
| 18|player3| null| true| 2|
+---+-------+------------+-----+---+
[研究此问题时,我发现了几篇文章指出option("mergeSchema", "true")
将是一个解决方案。实际上,有一个pull request。
是否有解决方案,还是尚待解决的问题?
我正在使用(Py)Spark 2.4.3和Python 3.6.8。
谢谢您!
UPDATE:
上述PR仅适用于Spark 3.0.0。感谢您提供@Shaido的信息。
由于从某些供应商数据进行架构演变,因此我遇到了同样的问题。我一直在尝试一些不同的想法,因为在Spark 3.0之前ORC mergeSchema选项不可用,而我们正在运行2.3我的第一个想法是用完整的架构(包括所有新列)创建一个空的数据框,并将其作为ORC文件保存到按字母顺序顺序排列的目录中。例如,如果我的数据按load_date进行分区,那么我将拥有诸如load_date = 00000000,load_date = 20200501,load_date = 20200601等文件夹。这行得通,但还不是很干净,我也不相信ORC读取器不会以某种方式选择其他ORC文件作为架构的基础。因此,我想到了只向ORC阅读器提供一个包含我需要的所有列的架构,并且可以正常工作。
schema = StructType([StructField('state', StringType(), True), StructField('new_col_middle', StringType(), True), StructField('abbr', StringType(), False), StructField('population', IntegerType(), False), StructField('new_col2', StringType(), False)])
df = spark.read.schema(schema).orc('/data/sandbox/orc_schema_evolution/')
在HDFS的orc_schema_evolution文件夹中,我们具有分区的load_date文件夹,其中一些ORC文件具有架构(“状态”,“人口”),而其他文件具有架构(“状态”,“人口”,“ abbr”)。注意,我什至可以使用这种方法重新排列现有列的顺序。