我想将2个数据帧与(可能)不匹配的模式合并
org.apache.spark.sql.DataFrame = [name: string, age: int, height: int]
org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> A.unionAll(B)
会导致:
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the same number of columns, but the left table has 2 columns and the right has 3;
我想从Spark内部做到这一点。但是,Spark文档只建议将整个2个数据帧写入目录并使用spark.read.option("mergeSchema", "true")
将其读回。
所以联盟并没有帮助我,文档也没有。如果可能的话,我想把这个额外的I / O从我的工作中解脱出来。我错过了一些未记载的信息,还是不可能(还)?
您可以将空列附加到帧B和联合2帧之后:
import org.apache.spark.sql.functions._
val missingFields = A.schema.toSet.diff(B.schema.toSet)
var C: DataFrame = null
for (field <- missingFields){
C = A.withColumn(field.name, expr("null"));
}
A.unionAll(C)
默认情况下禁用镶木地板模式合并,通过以下方式启用此选项:
(1) set global option: spark.sql.parquet.mergeSchema=true
(2) write code: sqlContext.read.option("mergeSchema", "true").parquet("my.parquet")
这是一个pyspark解决方案。
它假设如果由于一个数据帧缺少另一个包含的列而无法进行合并,那么正确的做法是添加具有空值的缺失列。
另一方面,如果由于两个数据帧共享具有冲突类型或可空性的列而无法进行合并,那么正确的方法是引发TypeError(因为这可能是您想要了解的冲突)。
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = right_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = right_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
return df_left.union(df_right)
谢谢@conradlee!我通过添加强制转换和删除可空性检查来修改您的解决方案以允许联合。它对我有用。
def harmonize_schemas_and_combine(df_left, df_right):
'''
df_left is the main df; we try to append the new df_right to it.
Need to do three things here:
1. Set other claim/clinical features to NULL
2. Align schemas (data types)
3. Align column orders
'''
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType) for f in df_left.schema)
right_fields = set((f.name, f.dataType) for f in df_right.schema)
# import pdb; pdb.set_trace() #pdb debugger
# I. First go over left-unique fields:
# For columns in the main df, but not in the new df: add it as Null
# For columns in both df but w/ different datatypes, use casting to keep them consistent w/ main df (Left)
for l_name, l_type in left_fields.difference(right_fields): #1. find what Left has, Right doesn't
if l_name in right_types: #2A. if column is in both, then something's off w/ the schema
r_type = right_types[l_name] #3. tell me what's this column's type in Right
df_right = df_right.withColumn(l_name,df_right[l_name].cast(l_type)) #4. keep them consistent w/ main df (Left)
print("Casting magic happened on column %s: Left type: %s, Right type: %s. Both are now: %s." % (l_name, l_type, r_type, l_type))
else: #2B. if Left column is not in Right, add a NULL column to Right df
df_right = df_right.withColumn(l_name, F.lit(None).cast(l_type))
# Make sure Right columns are in the same order of Left
df_right = df_right.select(df_left.columns)
return df_left.union(df_right)
这是另一个解决方案。我使用rdd union,因为dataFrame联合操作不支持多个dataFrames。注意 - 这不应该用于将许多dataFrame与不同的模式合并。将空列添加到dataFrames的成本将很快导致内存不足错误。 (即:尝试合并1000个dataFrames,缺少10个列将导致10,000次转换)如果您的用例是从具有不同模式的存储中读取数据框架,该模式由具有不同模式的多个路径组成,则更好的选择是您的数据首先保存为镶木地板,然后在读取dataFrame时使用'mergeSchema'选项。
def unionDataFramesAndMergeSchema(spark, dfsList):
'''
This function can perform a union between x dataFrames with different schemas.
Non-existing columns will be filled with null.
Note: If a column exist in 2 dataFrames with different types, an exception will be thrown.
:example:
>>> df1 = spark.createDataFrame([
>>> {
>>> 'A': 1,
>>> 'B': 1,
>>> 'C': 1
>>> }])
>>> df2 = spark.createDataFrame([
>>> {
>>> 'A': 2,
>>> 'C': 2,
>>> 'DNew' : 2
>>> }])
>>> unionDataFramesAndMergeSchema(spark,[df1,df2]).show()
>>> +---+----+---+----+
>>> | A| B| C|DNew|
>>> +---+----+---+----+
>>> | 2|null| 2| 2|
>>> | 1| 1| 1|null|
>>> +---+----+---+----+
:param spark: The Spark session.
:param dfsList: A list of dataFrames.
:return: A union of all dataFrames, with schema merged.
'''
if len(dfsList) == 0:
raise ValueError("DataFrame list is empty.")
if len(dfsList) == 1:
logging.info("The list contains only one dataFrame, no need to perform union.")
return dfsList[0]
logging.info("Will perform union between {0} dataFrames...".format(len(dfsList)))
columnNamesAndTypes = {}
logging.info("Calculating unified column names and types...")
for df in dfsList:
for columnName, columnType in dict(df.dtypes).iteritems():
if columnNamesAndTypes.has_key(columnName) and columnNamesAndTypes[columnName] != columnType:
raise ValueError(
"column '{0}' exist in at least 2 dataFrames with different types ('{1}' and '{2}'"
.format(columnName, columnType, columnNamesAndTypes[columnName]))
columnNamesAndTypes[columnName] = columnType
logging.info("Unified column names and types: {0}".format(columnNamesAndTypes))
logging.info("Adding null columns in dataFrames if needed...")
newDfsList = []
for df in dfsList:
newDf = df
dfTypes = dict(df.dtypes)
for columnName, columnType in dict(columnNamesAndTypes).iteritems():
if not dfTypes.has_key(columnName):
# logging.info("Adding null column for '{0}'.".format(columnName))
newDf = newDf.withColumn(columnName, func.lit(None).cast(columnType))
newDfsList.append(newDf)
dfsWithOrderedColumnsList = [df.select(columnNamesAndTypes.keys()) for df in newDfsList]
logging.info("Performing a flat union between all dataFrames (as rdds)...")
allRdds = spark.sparkContext.union([df.rdd for df in dfsWithOrderedColumnsList])
return allRdds.toDF()