我有一个具有特定架构的 Spark 数据框 (
df1
),并且我有另一个具有相同列但架构不同的数据框。我知道如何逐列执行此操作,但由于我有一大组列,因此会很长。为了保持跨数据帧的模式一致,我想知道是否能够将一种模式应用于另一个数据帧或创建一个完成这项工作的函数。
这是一个例子:
df1
# root
# |-- A: date (nullable = true)
# |-- B: integer (nullable = true)
# |-- C: string (nullable = true)
df2
# root
# |-- A: string (nullable = true)
# |-- B: string (nullable = true)
# |-- C: string (nullable = true)`
我想将
df1
的模式复制应用到 df2
。
我在一个专栏中尝试过这种方法。鉴于我有大量列,这将是一个相当漫长的方法。
df2 = df2.withColumn("B", df2["B"].cast('int'))
是的,可以通过
dataframe.schema.fields
动态实现
df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])
Example:
from pyspark.sql.functions import *
df1 = spark.createDataFrame([('2022-02-02',2,'a')],['A','B','C']).withColumn("A",to_date(col("A")))
print("df1 Schema")
df1.printSchema()
#df1 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)
df2 = spark.createDataFrame([('2022-02-02','2','a')],['A','B','C'])
print("df2 Schema")
df2.printSchema()
#df2 Schema
#root
# |-- A: string (nullable = true)
# |-- B: string (nullable = true)
# |-- C: string (nullable = true)
#
#casting the df2 columns by getting df1 schema using select clause
df3 = df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])
df3.show(10,False)
print("df3 Schema")
df3.printSchema()
#+----------+---+---+
#|A |B |C |
#+----------+---+---+
#|2022-02-02|2 |a |
#+----------+---+---+
#df3 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)
在此示例中,我使用 Integer,date,long types
定义了
df1。
df2
由 string
类型定义。
df3
使用 df2
作为源数据定义并附加 df1 schema
。
试试这个 -
输入数据框
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
data1 = [("2022-01-01", 1, "A"),
("2022-01-02", 2, "B"),
("2022-01-03", 3, "C")
]
data1 = [(datetime.strptime(date_str, "%Y-%m-%d"), b, c) for date_str, b, c in data1]
schema1 = StructType([StructField("A", DateType(), True),
StructField("B", IntegerType(), True),
StructField("C", StringType(), True)
]
)
df1 = spark.createDataFrame(data1, schema=schema1)
df1.printSchema()
data2 = [("2022-01-04", "4", "D"),
("2022-01-05", "5", "E"),
("2022-01-06", "6", "F")
]
schema2 = StructType([StructField("A", StringType(), True),
StructField("B", StringType(), True),
StructField("C", StringType(), True)
]
)
df2 = spark.createDataFrame(data2, schema=schema2)
df2.printSchema()
df2 = spark.createDataFrame(data=df2.rdd,schema=df1.schema)
df2.printSchema()
root
|-- A: date (nullable = true)
|-- B: integer (nullable = true)
|-- C: string (nullable = true)
或者,您创建一个需要更通用解决方案的方法 -
def apply_schema(df1, df2):
schema1 = df1.schema
schema2 = df2.schema
data_types = {field.name: field.dataType for field in schema1.fields}
for field in schema2.fields:
column_name = field.name
if column_name in data_types:
column_type = data_types[column_name]
df2 = df2.withColumn(column_name, df2[column_name].cast(column_type))
return df2
并使用此方法将 df1 的模式强加于 df2 -
df2 = apply_schema(df1, df2)
print("Schema of df1:")
df1.printSchema()
print("Schema of df2:")
df2.printSchema()
df2.show()
Schema of df1:
root
|-- A: date (nullable = true)
|-- B: integer (nullable = true)
|-- C: string (nullable = true)
Schema of df2:
root
|-- A: date (nullable = true)
|-- B: integer (nullable = true)
|-- C: string (nullable = true)
+----------+---+---+
| A| B| C|
+----------+---+---+
|2022-01-04| 4| D|
|2022-01-05| 5| E|
|2022-01-06| 6| F|
+----------+---+---+
我们还可以使用
dtypes
:
df2.select([F.col(c).cast(t) for c, t in df1.dtypes])
完整示例:
from pyspark.sql import functions as F
df1 = spark.createDataFrame([('1', '1.1')], ['col1', 'col2'])
df1.printSchema()
# root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
df2 = spark.createDataFrame([(1, 1.1)], ['col1', 'col2'])
df2.printSchema()
# root
# |-- col1: long (nullable = true)
# |-- col2: double (nullable = true)
df2 = df2.select([F.col(c).cast(t) for c, t in df1.dtypes])
df2.printSchema()
# root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)