我在尝试从 Databricks 中的 S3 存储桶读取某些 Delta 表时遇到了挑战。我的目标是将 Delta 表加载到 Databricks 中,虽然某些表(如 table_1)加载成功,但其他表(如 table_2)会产生以下错误:
`AnalysisException:检测到不兼容的格式。
您正在尝试使用 Delta 从 SECOND_DATALAKE/table_2/ 读取数据,但不存在事务日志。检查上游作业以确保它正在使用 format("delta") 进行写入,并且您正在尝试从表基本路径读取。 `
我认为此问题背后的原因是我的 AWS Glue 作业无法为 table_2 等重型表生成 _delta_log 文件夹。以下是我的 AWS Glue 作业的代码片段:
table_to_copy = 'TABLE_2'
尝试:
# Oracle SQL 节点生成脚本
OracleSQL_node1 =glueContext.create_dynamic_frame.from_options(
连接类型=“甲骨文”,
连接选项={
“url”: 'jdbc:oracle:thin://@datalake.eu-west-3.rds.amazonaws.com:10523:NAME1',
“用户”:用户名,
“密码”:密码,
“dbtable”:“架构”。 + 要复制的表,
},
conversion_ctx="OracleSQL_node1",
)
# Convertir a DataFrame
dataFrame = OracleSQL_node1.toDF()
# Escribir en formato Delta
dataFrame.repartition(10) \
.write.format('delta') \
.mode('overwrite') \
.save("SECOND_DATALAKE" + table_to_copy)
logger.info(f"{table_to_copy} was copied to the Delta Lake correctly")
例外情况为 e: logger.error(f"将 {table_to_copy} 复制到 Delta Lake 时出错:{str(e)}")
job.commit() Spark.stop()`
有谁知道如何解决此问题并确保为所有表正确生成 _delta_log 文件夹,无论其大小如何?
AWS Glue 作业负责从 Oracle 中提取数据并以 Delta 格式写入 S3。 该错误专门针对 AWS Glue 视为“重”的表而发生。 寻找有关优化 AWS Glue 作业的建议,以确保为所有表生成 _delta_log 文件夹。
我也遇到类似的问题
但是我不能给你工作样本。
因为我正在使用 OSS Delta Lake(=Linux 基金会 Delta Lake)
此链接(创建表 - DataFrameWriter API)可能有助于了解使用元存储或不使用元存储的不同之处。
无论如何,让我们尝试将
/
添加到您的路径中,如下所示。
dataFrame.repartition(10) \
.write.format('delta') \
.mode('overwrite') \
//.save( "SECOND_DATALAKE" + table_to_copy) //not work
.save("/" + "SECOND_DATALAKE" + table_to_copy)
据我所知,这会起作用。
如果您的存储位于 s3 上,请参阅示例代码链接
// Example: Example: Create a Delta Lake table from a DataFrame
// and register the table to Glue Data Catalog
val additional_options = Map(
"path" -> "s3://<s3Path>"
)
dataFrame.write.format("delta")
.options(additional_options)
.mode("append") // based on your needs, append/overwrite
.partitionBy("<your_partitionkey_field>")
.saveAsTable("<your_database_name>.<your_table_name>")
Metastore 的更多信息
Spark 需要 Metastore,它可能是以下之一
元存储有
好吧,我们再看一下代码,
// save() --> a path(your path will created under spark.sql.warehouse.dir)
// You need to remember where table saved.
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")
// saveAsTable() --> table name.
// You don't need to remember where table saved.
df.write.format("delta").saveAsTable("default.people10m")
因此,如果您使用 Databricks + AWS Glue,最好将
saveAsTable()
与表名一起使用。
// This is naming of table.
{catalog_name}.{schema(or DB)_name}.{table_name}