我创建了一个 pyspark 脚本来通过 Dataproc 将数据从 PG DB 迁移到 Google Bigquery,但是在 dataproc 上运行日志时遇到错误
引起:java.lang.NullPointerException:java.lang.IllegalStateException:意外类型:DecimalType(10,0)
Dataframe Schema
|-- log_id: long (nullable = true)
|-- prediction_date: timestamp (nullable = true)
|-- fk_process_id: long (nullable = true)
|-- record_id: long (nullable = true)
|-- model_output_json: string (nullable = true)
|-- predicted_ban: long (nullable = true)
和
Destination Table schema
log_id NUMERIC NULLABLE
prediction_date DATETIME NULLABLE
fk_process_id NUMERIC NULLABLE
record_id NUMERIC NULLABLE
model_output_json JSON NULLABLE
predicted_ban NUMERIC NULLABLE
我的理解是,long 和 NUMERIC 数据类型之间存在数据类型不匹配,因此我尝试将 Long 的属性类型转换为 int、Float、DecimalType、LongType,以使其与目标表 NUMERIC 数据类型兼容,但错误仍然存在。
有人可以帮忙解决这个问题吗?预先感谢。
DataFrame
中对应于BigQuery's NUMERIC
类型的列应以适当的精度和小数位数转换为 PySpark 的 DecimalType。我提供了一个示例,其中 log_id, fk_process_id, record_id,
和 predicted_ban
等列被转换为 DecimalType(38, 0)
以匹配 BigQuery 的 NUMERIC 数据类型。试试下面
from pyspark.sql.types import DecimalType
# Assuming df is your DataFrame
df = df \
.withColumn("log_id", df["log_id"].cast(DecimalType(38, 0))) \
.withColumn("fk_process_id", df["fk_process_id"].cast(DecimalType(38, 0))) \
.withColumn("record_id", df["record_id"].cast(DecimalType(38, 0))) \
.withColumn("predicted_ban", df["predicted_ban"].cast(DecimalType(38, 0)))