Redshift 将布尔数据类型解释为位,因此如果存在任何布尔数据类型列,则无法将 hudi 表从 S3 移动到 Redshift

问题描述 投票:0回答:1

我正在 AWS 中创建一个数据管道,用于通过 EMR 将数据从 S3 移动到 Redshift。数据以 HUDI 格式存储在 S3 的 parquet 文件中。我已经创建了用于完全负载传输的 Pyspark 脚本,并且出于 POC 目的,我在 S3 中的 hudi 表包含 4 列(id、名称、日期、已验证)。 Validated 是布尔数据类型,可以是 true 或 false。如果我删除已验证的列,则表将成功移动,但如果它包含布尔类型的已验证列,则脚本将引发以下错误:

File "/home/hadoop/move.py", line 41, in <module>
hudi_df.write.format("jdbc").options(**redshift_conn_options).mode("append").save()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1396, in save
File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1323, in call
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o77.save.
: com.amazon.redshift.util.RedshiftException: ERROR: Column "table_1.validated" has unsupported type "bit".

Redshift 将布尔列(已验证)视为“位”数据类型,但实际上它是布尔值。我正在 EMR 集群中运行这个

Pyspark
脚本。

我正在使用的代码:

from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder \
    .appName("HUDI to Redshift") \
    .getOrCreate()

# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'test2',
'hoodie.datasource.write.recordkey.field': 'id',
#'hoodie.datasource.write.partitionpath.field': 'date',
#'hoodie.datasource.write.precombine.field': 'timestamp',
'hoodie.datasource.write.operation': 'insert',
#'hoodie.upsert.shuffle.parallelism': 150,
'hoodie.insert.shuffle.parallelism': 2
}

# Read the Hudi files into a DataFrame
hudi_df = spark.read \
.format('org.apache.hudi') \
.options(**hudiOptions) \
.load('s3a://new-incr-test-data-1/test2/')

# Define Redshift connection options
redshift_conn_options = {
    "url": "jdbc:redshift://central-redshift-1.e1tijnrau71v.us-east-1.redshift.amazonaws.com:5439/dev",
    "dbtable": "bol",
    "user": "rev_user",
    "password": "Pp42kded687m262",
    "aws_iam_role": "arn:aws:iam::678621547901:role/Redshift_Role"
}

# Write the Hudi DataFrame to Redshift
hudi_df.write.format("jdbc").options(**redshift_conn_options).mode("append").save()

# Stop Spark session
spark.stop()

我已经尝试了多种方法,例如在写入

Redshift
之前将“已验证”列转换为适当的布尔类型。对脚本进行了许多更改,但都不起作用。在将数据写入 Redshift 之前,我还会在屏幕上打印所有列数据类型,并且“已验证”列也显示为布尔类型,并且不知道为什么 Redshift 将其视为“位”数据类型。

我希望使用我的

Redshift
脚本将整个表和所有列从 S3 移动到
Pyspark

python pyspark amazon-redshift amazon-emr apache-hudi
1个回答
0
投票

使用spark jdbc数据源,您可以提供查询而不是表名。然后,您应该能够在启动 Spark 数据帧之前将列投射到红移侧。

所以替换:

 "dbtable": "bol"

 "dbtable": "(select col1, col2, cast(bool_col as integer) as book_col from bol) as tmp"
© www.soinside.com 2019 - 2024. All rights reserved.