AWS Glue 作业:尝试将镶木地板文件写入 S3 时出现 SchemaColumnConvertNotSupportedException

问题描述 投票:0回答:1

我在 AWS Glue 目录中有一个表,其中包含所有字符串的数据类型,并且这些文件作为 parquet 文件存储在 S3 中。我想创建一个 Glue 作业,它只需从该目录中读取数据,按日期对文件进行分区,然后将这些文件写回 S3。但是,我不断收到 SchemaColumnConvertNotSupportedException,表示无法转换镶木地板列。

我尝试了ApplyMapping函数来确保所有数据都在DynamicFrame中表示为字符串

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import input_file_name

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)

spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

dyf = glueContext.create_dynamic_frame.from_catalog(database = "processed", table_name = "compass_2_daily_tkt_parquet")

mapping_dyf = ApplyMapping.apply(frame = tkt, mappings = [("tkt_id", "string", "tkt_id", "string"), ("tkt_nbr", "string", "tkt_nbr", "string"), ("tran_nbr", "string", "tran_nbr", "string"), ("pnr_ref", "string", "pnr_ref", "string"), ("provider_id", "string", "provider_id", "string"), ("carr_nbr", "string", "carr_nbr", "string"), ("agency_nbr", "string", "agency_nbr", "string"), ("point_of_orig_arpt_cd", "string", "point_of_orig_arpt_cd", "string"), ("dt_of_issue", "string", "dt_of_issue", "string"), ("curr_type_cd", "string", "curr_type_cd", "string"), ("fare_amt", "string", "fare_amt", "string"), ("fare_curr_type_cd", "string", "fare_curr_type_cd", "string"), ("tax_amt", "string", "tax_amt", "string"), ("fee_amt", "string", "fee_amt", "string"), ("comm_amt", "string", "comm_amt", "string"), ("doc_amt", "string", "doc_amt", "string"), ("pfc_amt", "string", "pfc_amt", "string"), ("proc_ped", "string", "proc_ped", "string"), ("intl_sale_ind", "string", "intl_sale_ind", "string"), ("e_tkt_ind", "string", "e_tkt_ind", "string"), ("fare_calc_ind", "string", "fare_calc_ind", "string"), ("tour_cd", "string", "tour_cd", "string"), ("dist_channel", "string", "dist_channel", "string"), ("cntry_cd", "string", "cntry_cd", "string"), ("stat_cd", "string", "stat_cd", "string"), ("tran_cd", "string", "tran_cd", "string"), ("data_source_cd", "string", "data_source_cd", "string"), ("data_sharing_id", "string", "data_sharing_id", "string"), ("load_ts", "timestamp", "load_ts", "timestamp"), ("net_fare_amt", "string", "net_fare_amt", "string"), ("suppl_fare_amt", "string", "suppl_fare_amt", "string"), ("contributed_ind", "string", "contributed_ind", "string"), ("print_cpui", "string", "print_cpui", "string"), ("file_id", "string", "file_id", "string"), ("print_fare_basis", "string", "print_fare_basis", "string"), ("waiver_cd", "string", "waiver_cd", "string"), ("comp_conj_cd", "string", "comp_conj_cd", "string"), ("tot_amt", "string", "tot_amt", "string"), ("bulk_ticketing_cd", "string", "bulk_ticketing_cd", "string"), ("equivalent_fare_amt", "string", "equivalent_fare_amt", "string"), ("equivalent_fare_curr_type_cd", "string", "equivalent_fare_curr_type_cd", "string"), ("fare_amt_usd", "string", "fare_amt_usd", "string"), ("tax_amt_usd", "string", "tax_amt_usd", "string"), ("doc_amt_usd", "string", "doc_amt_usd", "string"), ("fare_amt_eur", "string", "fare_amt_eur", "string"), ("tax_amt_eur", "string", "tax_amt_eur", "string"), ("doc_amt_eur", "string", "doc_amt_eur", "string"), ("depart_dt", "string", "depart_dt", "string"), ("trip_type_cd", "string", "trip_type_cd", "string"), ("tkt_dest_cd", "string", "tkt_dest_cd", "string"), ("dds_tkt_rndmzn_pcg", "string", "dds_tkt_rndmzn_pcg", "string")])

tkt_df = mapping_dyf.toDF().withColumn("filename", input_file_name())

tkt_df.repartition("filename").write.partitionBy("filename").mode("append").parquet("s3://landing-spot")

tkt_df.write.parquet("s3://landing-spot", partitionBy=["filename"])

datasink = glueContext.write_dynamic_frame.from_options(frame = tkt_dyf, connection_type = "s3", connection_options = {"path": "s3://landing-spot/", "partitionKeys": ["filename"]}, format = "parquet")

job.commit()
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.constructConvertNotSupportedException(VectorizedColumnReader.java:250)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBinaryBatch(VectorizedColumnReader.java:497)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:220)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:261)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
... 28 more
2019-08-07 19:57:41,651 ERROR [Executor task launch worker for task 7] executor.Executor (Logging.scala:logError(91)) - Exception in task 6.0 in stage 1.0 (TID 7)
org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3://file.parquet. Column: [tran_nbr], Expected: double, Found: BINARY

这让我很困惑,因为我将映射应用到 DynamicFrame,但是当流程将数据写入 S3 时,AWS Glue 似乎尝试再次推断架构。

python apache-spark amazon-s3 pyspark aws-glue
1个回答
0
投票

你有没有尝试过

conf.set("spark.sql.parquet.enableVectorizedReader", False)
有关矢量化镶木地板阅读的更多信息

© www.soinside.com 2019 - 2024. All rights reserved.