Pyspark 中的 AutoCast 数据类型

问题描述 投票:0回答:1

我正在从 PostgresSQL 创建一个 parquet 文件,它的所有内容都标记为 varchar 列。一旦我们在 ADLS 中拥有文件,我们希望使用 Python/Pyspark 根据 Azure databricks 中的日期、整数、varchar 字段等数据来转换数据类型。相同的代码将被具有不同架构的多个文件使用,因此需要一个通用的过程。 所以想要一种自动投射字段的方法

python pyspark casting azure-databricks
1个回答
0
投票

我尝试根据日期、整数、Varchar 字段等数据,使用 pyspark 重现和 Auto Cast 方法来 CAST 所有 Varchar 数据类型。

例如,我使用字符串数据类型创建了数据和列,并将它们以 Parquet 格式保存在 ADLS 中。

schema = StructType([
StructField("name", StringType(), True),
StructField("birth_date", StringType(), True),
StructField("age", StringType(), True),
StructField("score", StringType(), True)
])
data = [("John", "1990-01-01", "123", "12.34"),
("Alice", "1995-05-15", "456", "56.78")]

enter image description here

enter image description here

下面的代码将对数据类型Date、Integer、Varchar

进行自动转换

enter image description here

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
spark = SparkSession.builder.appName("AutoCastParquet").getOrCreate()
Parquet_file_path = "abfss://[email protected]/parquet_sample.parquet"
parquet_df = spark.read.parquet(Parquet_file_path)
casting_functions = {
    "string": lambda col_name: col(col_name),  
    "date": to_date,
    "timestamp": to_timestamp,
    "integer": lambda col_name: col(col_name).cast("integer"),
    "double": lambda col_name: col(col_name).cast("double"),
}
for column_name, data_type in parquet_df.dtypes:
    if "string" in data_type.lower():
        cast_func = casting_functions.get(data_type.lower())
        if cast_func:
            parquet_df = parquet_df.withColumn(column_name, cast_func(column_name))
parquet_df = parquet_df.withColumn(
    "birth_date",
    when(
        to_date(col("birth_date"), "yyyy-MM-dd").isNotNull(),
        to_date(col("birth_date"), "yyyy-MM-dd")
    ).otherwise(None)
)
parquet_df = parquet_df.withColumn(
    "age",
    when(
        col("age").cast("integer").isNotNull(),
        col("age").cast("integer")
    ).otherwise(None)
)
parquet_df = parquet_df.withColumn(
    "score",
    when(
        col("score").cast("double").isNotNull(),
        col("score").cast("double")
    ).otherwise(None)
)
new_schema = StructType([
    StructField("name", StringType(), True),
    StructField("birth_date", DateType(), True),
    StructField("age", IntegerType(), True),
    StructField("score", DoubleType(), True)
])
parquet_df = spark.createDataFrame(parquet_df.rdd, new_schema)
parquet_df.printSchema()
display(parquet_df)

enter image description here

上面的代码将 Parquet 文件从 Azure Data Lake Storage 读取到 Spark DataFrame 中,自动识别 StringType 列,并使用预定义的转换函数将它们转换为其推断的数据类型。生成的 DataFrame,具有更正的数据类型。

© www.soinside.com 2019 - 2024. All rights reserved.