我正在尝试编写一个Glue(PySpark)作业执行一些ETL并最终将该数据写入SQL Server中的表(在AWS Glue目录中定义)。在将记录写入SQL Server表时,可能存在阻止某些记录(即“坏”记录)写入表的约束(示例:主键,外键,列类型)。发生这种情况时,Glue作业会抛出错误并且作业失败。有没有办法防止整个工作失败?相反,是否可以只编写“好”记录并将违反SQL Server的“坏”记录返回到胶水作业(以便将它们上传到S3)?
我使用write_dynamic_frame_from_catalog函数将数据写入SQL Server表。以下是一些上下文示例代码:
# perform etl
output_df=spark.sql("SELECT ...")
# create dataframe and write to SQL Server
output_dynamic_frame = DynamicFrame.fromDF(output_df, glueContext, 'output_dynamic_frame')
glueContext.write_dynamic_frame_from_catalog(frame = output_dynamic_frame, database="<DATABASE_NAME>", table_name="<TABLE_NAME>")
将数据写入SQL Server后,我希望返回违反SQL Server表约束的记录,以便将它们上载到S3。
我认为您可以使用AWS Glue将数据从数据库中提取到S3中,然后使用Pyspark,您可以在读取S3文件时获得“错误记录”:
corruptDF = (spark.read
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "_corrupt_record")
.csv("s3://bucket-name/path")
然后,您可以通过“columnNameOfCorruptRecord”字段进行过滤,并将“好的”保存到您的数据库,将“坏的”保存到S3路径。
此外,Databricks还有处理不良记录和文件here的功能,您可以在阅读文件时提供badRecordsPath
选项,以便将“行李记录”发送到该路径。请注意,这只适用于阅读csv,json和任何基于文件的内置源(例如镶木地板)