我正在从 PostgresSQL 创建一个 parquet 文件,它的所有内容都标记为 varchar 列。一旦我们在 ADLS 中拥有文件,我们希望使用 Python/Pyspark 根据 Azure databricks 中的日期、整数、varchar 字段等数据来转换数据类型。相同的代码将被具有不同架构的多个文件使用,因此需要一个通用的过程。 所以想要一种自动投射字段的方法
我尝试根据日期、整数、Varchar 字段等数据,使用 pyspark 重现和 Auto Cast 方法来 CAST 所有 Varchar 数据类型。
例如,我使用字符串数据类型创建了数据和列,并将它们以 Parquet 格式保存在 ADLS 中。
schema = StructType([
StructField("name", StringType(), True),
StructField("birth_date", StringType(), True),
StructField("age", StringType(), True),
StructField("score", StringType(), True)
])
data = [("John", "1990-01-01", "123", "12.34"),
("Alice", "1995-05-15", "456", "56.78")]
下面的代码将对数据类型Date、Integer、Varchar
进行自动转换from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, to_timestamp, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, DoubleType
spark = SparkSession.builder.appName("AutoCastParquet").getOrCreate()
Parquet_file_path = "abfss://[email protected]/parquet_sample.parquet"
parquet_df = spark.read.parquet(Parquet_file_path)
casting_functions = {
"string": lambda col_name: col(col_name),
"date": to_date,
"timestamp": to_timestamp,
"integer": lambda col_name: col(col_name).cast("integer"),
"double": lambda col_name: col(col_name).cast("double"),
}
for column_name, data_type in parquet_df.dtypes:
if "string" in data_type.lower():
cast_func = casting_functions.get(data_type.lower())
if cast_func:
parquet_df = parquet_df.withColumn(column_name, cast_func(column_name))
parquet_df = parquet_df.withColumn(
"birth_date",
when(
to_date(col("birth_date"), "yyyy-MM-dd").isNotNull(),
to_date(col("birth_date"), "yyyy-MM-dd")
).otherwise(None)
)
parquet_df = parquet_df.withColumn(
"age",
when(
col("age").cast("integer").isNotNull(),
col("age").cast("integer")
).otherwise(None)
)
parquet_df = parquet_df.withColumn(
"score",
when(
col("score").cast("double").isNotNull(),
col("score").cast("double")
).otherwise(None)
)
new_schema = StructType([
StructField("name", StringType(), True),
StructField("birth_date", DateType(), True),
StructField("age", IntegerType(), True),
StructField("score", DoubleType(), True)
])
parquet_df = spark.createDataFrame(parquet_df.rdd, new_schema)
parquet_df.printSchema()
display(parquet_df)
上面的代码将 Parquet 文件从 Azure Data Lake Storage 读取到 Spark DataFrame 中,自动识别 StringType 列,并使用预定义的转换函数将它们转换为其推断的数据类型。生成的 DataFrame,具有更正的数据类型。