从 Postgres 读取数据并写入 Google BigQuery 时架构不匹配

问题描述 投票:0回答:1

我创建了一个 pyspark 脚本来通过 Dataproc 将数据从 PG DB 迁移到 Google Bigquery,但是在 dataproc 上运行日志时遇到错误

引起:java.lang.NullPointerException:java.lang.IllegalStateException:意外类型:DecimalType(10,0)

Dataframe Schema

 |-- log_id:            long      (nullable = true)
 |-- prediction_date:   timestamp (nullable = true)
 |-- fk_process_id:     long      (nullable = true)
 |-- record_id:         long      (nullable = true)
 |-- model_output_json: string    (nullable = true)
 |-- predicted_ban:     long      (nullable = true)

Destination Table schema

log_id            NUMERIC   NULLABLE  
prediction_date   DATETIME  NULLABLE    
fk_process_id     NUMERIC   NULLABLE       
record_id         NUMERIC   NULLABLE    
model_output_json    JSON   NULLABLE      
predicted_ban     NUMERIC   NULLABLE

我的理解是,long 和 NUMERIC 数据类型之间存在数据类型不匹配,因此我尝试将 Long 的属性类型转换为 int、Float、DecimalType、LongType,以使其与目标表 NUMERIC 数据类型兼容,但错误仍然存在。

有人可以帮忙解决这个问题吗?预先感谢。

postgresql pyspark google-bigquery spark-streaming google-cloud-dataproc
1个回答
0
投票

DataFrame
中对应于
BigQuery's NUMERIC
类型的列应以适当的精度和小数位数转换为 PySpark 的 DecimalType。我提供了一个示例,其中
log_id, fk_process_id, record_id,
predicted_ban
等列被转换为
DecimalType(38, 0)
以匹配 BigQuery 的 NUMERIC 数据类型。试试下面

from pyspark.sql.types import DecimalType

# Assuming df is your DataFrame
df = df \
    .withColumn("log_id", df["log_id"].cast(DecimalType(38, 0))) \
    .withColumn("fk_process_id", df["fk_process_id"].cast(DecimalType(38, 0))) \
    .withColumn("record_id", df["record_id"].cast(DecimalType(38, 0))) \
    .withColumn("predicted_ban", df["predicted_ban"].cast(DecimalType(38, 0)))
© www.soinside.com 2019 - 2024. All rights reserved.