从 Databricks 中的 S3 读取增量表时出现问题 (_delta_log)

问题描述 投票:0回答:1

我在尝试从 Databricks 中的 S3 存储桶读取某些 Delta 表时遇到了挑战。我的目标是将 Delta 表加载到 Databricks 中,虽然某些表(如 table_1)加载成功,但其他表(如 table_2)会产生以下错误:

`AnalysisException:检测到不兼容的格式。

您正在尝试使用 Delta 从 SECOND_DATALAKE/table_2/ 读取数据,但不存在事务日志。检查上游作业以确保它正在使用 format("delta") 进行写入,并且您正在尝试从表基本路径读取。 `

我认为此问题背后的原因是我的 AWS Glue 作业无法为 table_2 等重型表生成 _delta_log 文件夹。以下是我的 AWS Glue 作业的代码片段:

 
table_to_copy = 'TABLE_2' 尝试: # Oracle SQL 节点生成脚本 OracleSQL_node1 =glueContext.create_dynamic_frame.from_options( 连接类型=“甲骨文”, 连接选项={ “url”: 'jdbc:oracle:thin://@datalake.eu-west-3.rds.amazonaws.com:10523:NAME1', “用户”:用户名, “密码”:密码, “dbtable”:“架构”。 + 要复制的表, }, conversion_ctx="OracleSQL_node1", )

# Convertir a DataFrame
dataFrame = OracleSQL_node1.toDF()

# Escribir en formato Delta
dataFrame.repartition(10) \
    .write.format('delta') \
    .mode('overwrite') \
    .save("SECOND_DATALAKE" + table_to_copy)  
    
logger.info(f"{table_to_copy} was copied to the Delta Lake correctly")

例外情况为 e: logger.error(f"将 {table_to_copy} 复制到 Delta Lake 时出错:{str(e)}")

job.commit() Spark.stop()`

有谁知道如何解决此问题并确保为所有表正确生成 _delta_log 文件夹,无论其大小如何?

AWS Glue 作业负责从 Oracle 中提取数据并以 Delta 格式写入 S3。 该错误专门针对 AWS Glue 视为“重”的表而发生。 寻找有关优化 AWS Glue 作业的建议,以确保为所有表生成 _delta_log 文件夹。

amazon-web-services aws-glue delta-lake aws-databricks
1个回答
0
投票

我也遇到类似的问题

但是我不能给你工作样本。
因为我正在使用 OSS Delta Lake(=Linux 基金会 Delta Lake)

此链接(创建表 - DataFrameWriter API)可能有助于了解使用元存储或不使用元存储的不同之处。

无论如何,让我们尝试将

/
添加到您的路径中,如下所示。

dataFrame.repartition(10) \
    .write.format('delta') \
    .mode('overwrite') \
    //.save(    "SECOND_DATALAKE" + table_to_copy) //not work
    .save("/" + "SECOND_DATALAKE" + table_to_copy)    

据我所知,这会起作用。
如果您的存储位于 s3 上,请参阅示例代码链接

// Example: Example: Create a Delta Lake table from a DataFrame
// and register the table to Glue Data Catalog

val additional_options = Map(
  "path" -> "s3://<s3Path>"
)
dataFrame.write.format("delta")
  .options(additional_options)
  .mode("append")   // based on your needs, append/overwrite
  .partitionBy("<your_partitionkey_field>")
  .saveAsTable("<your_database_name>.<your_table_name>")

Metastore 的更多信息

Spark 需要 Metastore,它可能是以下之一

  • hive(或 hive-metastore)
  • AWS Glue:仅限 AWS
  • Unity 目录:仅限 Databricks
  • Derby:嵌入式或自行配置(利于开发)

元存储有

  • 表名称及其位置(存储上的路径)和属性。
  • 带有基本路径的架构(数据库名称);又名“spark.sql.warehouse.dir”

好吧,我们再看一下代码,

// save() --> a path(your path will created under spark.sql.warehouse.dir)
// You need to remember where table saved.
df.write.format("delta").mode("overwrite").save("/tmp/delta/people10m")

// saveAsTable() --> table name.
// You don't need to remember where table saved.
df.write.format("delta").saveAsTable("default.people10m")

因此,如果您使用 Databricks + AWS Glue,最好将

saveAsTable()
与表名一起使用。

// This is naming of table.
{catalog_name}.{schema(or DB)_name}.{table_name}
© www.soinside.com 2019 - 2024. All rights reserved.