更改 Iceberg 中的分区字段时出错,来自 Spark

问题描述 投票:0回答:1

我们正在使用 Spark 写入 Iceberg,当重命名分区字段名称时,我们收到验证错误:

org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: some_date: void(1)

Iceberg 似乎指的是现有的表分区字段名称,这不再相关 - 因为有一个新的分区字段,并且写入模式是“覆盖”。

有什么建议吗? 这是一个最小的可重现示例:

创建分区字段为“some_date”的原始表:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType ,StructField, StringType
dataDF = [('1991-04-01',)]
schema = StructType([
        StructField('some_date',StringType(), True)])

spark = SparkSession.builder.master('local[1]').appName('example') \
    .getOrCreate()

df = spark.createDataFrame(data = dataDF, schema = schema)
spark.sql(f"use iprod")  # catalog
spark.sql(f"CREATE SCHEMA IF NOT EXISTS iprod.test_schema")

df.write.mode("overwrite").format("parquet").partitionBy('some_date').saveAsTable("iprod.test_schema.example")

尝试用相同的代码覆盖表,但分区字段重命名为some_date_2

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType ,StructField, StringType
dataDF = [('1991-04-01',)]
schema = StructType([
        StructField('some_date_2',StringType(), True)])

spark = SparkSession.builder.master('local[1]').appName('example') \
    .getOrCreate()

df = spark.createDataFrame(data = dataDF, schema = schema)
spark.sql(f"use iprod")  # catalog
spark.sql(f"CREATE SCHEMA IF NOT EXISTS iprod.test_schema")

df.write.mode("overwrite").format("parquet").partitionBy('some_date_2').saveAsTable("iprod.test_schema.example")

完整踪迹:

: org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: some_date: void(1)
    at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:46)
    at org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:511)
    at org.apache.iceberg.PartitionSpec$Builder.build(PartitionSpec.java:503)
    at org.apache.iceberg.TableMetadata.reassignPartitionIds(TableMetadata.java:768)
    at org.apache.iceberg.TableMetadata.buildReplacement(TableMetadata.java:790)
    at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.newReplaceTableTransaction(BaseMetastoreCatalog.java:256)
    at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.createOrReplaceTransaction(BaseMetastoreCatalog.java:244)
    at org.apache.iceberg.CachingCatalog$CachingTableBuilder.createOrReplaceTransaction(CachingCatalog.java:244)
    at org.apache.iceberg.spark.SparkCatalog.stageCreateOrReplace(SparkCatalog.java:190)
    at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:197)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:40)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:40)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.doExecute(V2CommandExec.scala:55)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:686)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:619)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
apache-spark pyspark apache-iceberg
1个回答
0
投票

这个错误是因为你的表的Iceberg表格式是版本1。

您应该将表更新到版本 2(

format-version
表属性)。 AFAIK,它可以通过 SQL 完成:

ALTER TABLE catalog.ns.table
SET TBLPROPERTIES (
  'format-version' = '2'
)

但也适用于 DataFrame API v2。比如:

df.writeTo('catalog.ns.table').using("iceberg").tableProperty("format-version", "2").createOrReplace()

您可以在 spec 中阅读有关 Iceberg 表格式的更多信息(而 here 您将找到版本 1 和 2 之间变更集的摘要)。

如果您想坚持使用版本 1,您应该

DROP
然后重新
ADD
分区(通过
ALTER TABLE
)。

© www.soinside.com 2019 - 2024. All rights reserved.