将架构从一个数据帧复制到另一个数据帧

问题描述 投票:0回答:3

我有一个具有特定架构的 Spark 数据框 (

df1
),并且我有另一个具有相同列但架构不同的数据框。我知道如何逐列执行此操作,但由于我有一大组列,因此会很长。为了保持跨数据帧的模式一致,我想知道是否能够将一种模式应用于另一个数据帧或创建一个完成这项工作的函数。

这是一个例子:

df1
# root
#  |-- A: date (nullable = true)
#  |-- B: integer (nullable = true)
#  |-- C: string (nullable = true)

df2
# root
#  |-- A: string (nullable = true)
#  |-- B: string (nullable = true)
#  |-- C: string (nullable = true)`

我想将

df1
的模式复制应用到
df2

我在一个专栏中尝试过这种方法。鉴于我有大量列,这将是一个相当漫长的方法。

df2 = df2.withColumn("B", df2["B"].cast('int'))
python apache-spark pyspark schema pyspark-schema
3个回答
3
投票

是的,可以通过

dataframe.schema.fields

动态实现

df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])

Example:

from pyspark.sql.functions import *
df1 = spark.createDataFrame([('2022-02-02',2,'a')],['A','B','C']).withColumn("A",to_date(col("A")))
print("df1 Schema")
df1.printSchema()
#df1 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)

df2 = spark.createDataFrame([('2022-02-02','2','a')],['A','B','C'])
print("df2 Schema")
df2.printSchema()
#df2 Schema
#root
# |-- A: string (nullable = true)
# |-- B: string (nullable = true)
# |-- C: string (nullable = true)
#

#casting the df2 columns by getting df1 schema using select clause
df3 = df2.select(*[(col(x.name).cast(x.dataType)) for x in df1.schema.fields])
df3.show(10,False)
print("df3 Schema")
df3.printSchema()

#+----------+---+---+
#|A         |B  |C  |
#+----------+---+---+
#|2022-02-02|2  |a  |
#+----------+---+---+

#df3 Schema
#root
# |-- A: date (nullable = true)
# |-- B: long (nullable = true)
# |-- C: string (nullable = true)

在此示例中,我使用 Integer,date,long types 定义了

df1

df2
string
类型定义。

df3
使用
df2
作为源数据定义并附加
df1 schema


2
投票

试试这个 -

输入数据框

from pyspark.sql.functions import *
from pyspark.sql.types import *

from datetime import datetime

data1 = [("2022-01-01", 1, "A"),
         ("2022-01-02", 2, "B"),
         ("2022-01-03", 3, "C")
        ]

data1 = [(datetime.strptime(date_str, "%Y-%m-%d"), b, c) for date_str, b, c in data1]

schema1 = StructType([StructField("A", DateType(), True),
                      StructField("B", IntegerType(), True),
                      StructField("C", StringType(), True)
                     ]
                    )

df1 = spark.createDataFrame(data1, schema=schema1)

df1.printSchema()

data2 = [("2022-01-04", "4", "D"),
         ("2022-01-05", "5", "E"),
         ("2022-01-06", "6", "F")
        ]
schema2 = StructType([StructField("A", StringType(), True),
                      StructField("B", StringType(), True),
                      StructField("C", StringType(), True)
                     ]
                    )
df2 = spark.createDataFrame(data2, schema=schema2)

df2.printSchema()
df2 = spark.createDataFrame(data=df2.rdd,schema=df1.schema)
df2.printSchema()

root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

或者,您创建一个需要更通用解决方案的方法 -

def apply_schema(df1, df2):
    schema1 = df1.schema
    
    schema2 = df2.schema
    
    data_types = {field.name: field.dataType for field in schema1.fields}
    
    for field in schema2.fields:
        column_name = field.name
        
        if column_name in data_types:
            column_type = data_types[column_name]
            df2 = df2.withColumn(column_name, df2[column_name].cast(column_type))
    
    return df2

并使用此方法将 df1 的模式强加于 df2 -

df2 = apply_schema(df1, df2)

print("Schema of df1:")
df1.printSchema()

print("Schema of df2:")
df2.printSchema()

df2.show()

Schema of df1:
root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

Schema of df2:
root
 |-- A: date (nullable = true)
 |-- B: integer (nullable = true)
 |-- C: string (nullable = true)

+----------+---+---+
|         A|  B|  C|
+----------+---+---+
|2022-01-04|  4|  D|
|2022-01-05|  5|  E|
|2022-01-06|  6|  F|
+----------+---+---+

0
投票

我们还可以使用

dtypes
:

df2.select([F.col(c).cast(t) for c, t in df1.dtypes])

完整示例:

from pyspark.sql import functions as F

df1 = spark.createDataFrame([('1', '1.1')], ['col1', 'col2'])
df1.printSchema()
# root
#  |-- col1: string (nullable = true)
#  |-- col2: string (nullable = true)

df2 = spark.createDataFrame([(1, 1.1)], ['col1', 'col2'])
df2.printSchema()
# root
#  |-- col1: long (nullable = true)
#  |-- col2: double (nullable = true)

df2 = df2.select([F.col(c).cast(t) for c, t in df1.dtypes])
df2.printSchema()
# root
#  |-- col1: string (nullable = true)
#  |-- col2: string (nullable = true)
© www.soinside.com 2019 - 2024. All rights reserved.