使用s3和glue时无法以iceberg格式保存分区数据

问题描述 投票:0回答:1

出现以下错误-

 java.lang.IllegalStateException: Incoming records violate the writer assumption that records are clustered by spec and by partition within each spec. Either cluster the incoming records or switch to fanout writers.
Encountered records that belong to already closed files:
partition 'year=2022/month=10/day=8/hour=12' in spec [
  1000: year: identity(24)
  1001: month: identity(25)
  1002: day: identity(26)
  1003: hour: identity(27)
]
        at org.apache.iceberg.io.ClusteredWriter.write(ClusteredWriter.java:96)
        at org.apache.iceberg.io.ClusteredDataWriter.write(ClusteredDataWriter.java:31)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:758)
        at org.apache.iceberg.spark.source.SparkWrite$PartitionedDataWriter.write(SparkWrite.java:728)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:442)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)

这是我在 Spark 3.3 上运行的查询,带有粘合目录并保存到 s3。冰山版本是1.1.0 -

USING iceberg
PARTITIONED BY (year, month, day, hour)
AS SELECT * from data

但是当我尝试在不分区的情况下保存数据时,它可以正常工作 -

CREATE TABLE my_catalog.test.iceberg_test
USING iceberg
PARTITIONED BY (year, month, day, hour)
AS SELECT * from data 

我该如何解决这个问题?

apache-spark amazon-s3 aws-glue apache-iceberg
1个回答
3
投票

根据docs,保存数据之前需要对数据进行排序 -

Iceberg 要求在针对分区表进行写入之前根据每个任务的分区规范(Spark 分区)对数据进行排序。这适用于使用 SQL 写入和使用 DataFrame 写入。

这就是我解决问题的方法 -

df = spark.read.orc("s3a://...")
df = df.sortWithinPartitions("year", "month", "day", "hour")
df.createOrReplaceTempView("data")

然后运行分区sql查询没有任何问题。

© www.soinside.com 2019 - 2024. All rights reserved.