从 Spark 编写 AVRO 时,Iceberg 不工作

问题描述 投票:0回答:1

将 AVRO 文件从 GCS 附加到表时遇到以下错误。 avro 文件有效,但我们使用压缩的 avro,这是一个问题吗?

线程“streaming-job-executor-0”中的异常 java.lang.NoClassDefFoundError: org/apache/avro/InvalidAvroMagicException at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:101) at org.apache。 org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77) 位于 org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:37) 位于 org.apache.iceberg.re located.com.google.common.collect。 Iterables.addAll(Iterables.java:320) 在 org.apache.iceberg.re located.com.google.common.collect.Lists.newLinkedList(Lists.java:237) 在 org.apache.iceberg.ManifestLists.read(ManifestLists. java:46)在org.apache.iceberg.BaseSnapshot.cacheManifests(BaseSnapshot.java:127)在org.apache.iceberg.BaseSnapshot.dataManifests(BaseSnapshot.java:149)在org.apache.iceberg.MergingSnapshotProducer.apply(MergingSnapshotProducer) .java:343) 在 org.apache.iceberg.SnapshotProducer.apply(SnapshotProducer.java:163) 在 org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:276) 在 org.apache.iceberg.util .Tasks$Builder.runTaskWithRetry(Tasks.java:404) 在 org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:213) 在 org.apache.iceberg.util.Tasks$Builder.run(Tasks .java:197) 在 org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:189) 在 org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:275) 在 com.snapchat.transformer。 TransformerStreamingWorker.lambda$execute$d121240d$1(TransformerStreamingWorker.java:162) 在 org.apache.spark.streaming.api.java.JavaDStreamLike.$anonfun$foreachRDD$2(JavaDStreamLike.scala:280) 在 org.apache.spark.streaming .api.java.JavaDStreamLike.$anonfun$foreachRDD$2$改编(JavaDStreamLike.scala:280)在org.apache.spark.streaming.dstream.ForEachDStream.$anonfun$generateJob$2(ForEachDStream.scala:51)在scala.runtime .java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在 org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) 在 org.apache.spark.streaming .dstream.ForEachDStream.$anonfun$generateJob$1(ForEachDStream.scala:51) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在 scala.util.Try$。应用(Try.scala:213)在org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)在org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.$anonfun$run$1( JobScheduler.scala:257) 在 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 在 scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) 在 org.apache .spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:257) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor) .java:624)在java.lang.Thread.run(Thread.java:748)引起:java.lang.ClassNotFoundException:org.apache.avro.InvalidAvroMagicException在java.net.URLClassLoader.findClass(URLClassLoader.java:382 )在 java.lang.ClassLoader.loadClass(ClassLoader.java:418) 在 java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 38 更多

日志显示iceberg表已经存在,但是我在gcs中看不到元数据文件?我正在从 dataproc 集群运行 Spark 作业,在哪里可以看到元数据文件?

#####################
Iceberg version: 0.11
spark version 3.0
#####################

    public void appendData(List<FileMetadata> publishedFiles, Schema icebergSchema) {
        TableIdentifier tableIdentifier = TableIdentifier.of(TRANSFORMER, jobConfig.streamName());
        // PartitionSpec partitionSpec = IcebergInternalFields.getPartitionSpec(tableSchema);
        HadoopTables tables = new HadoopTables(new Configuration());

       
        PartitionSpec partitionSpec = PartitionSpec.builderFor(icebergSchema)
                .build();

        Table table = null;
        if (tables.exists(tableIdentifier.name())) {
            table = tables.load(tableIdentifier.name());
        } else {
            table = tables.create(
                    icebergSchema,
                    partitionSpec,
                    tableIdentifier.name());
        }
        AppendFiles appendFiles = table.newAppend();
        for (FileMetadata fileMetadata : publishedFiles) {

            appendFiles.appendFile(DataFiles.builder(partitionSpec)
                    .withPath(fileMetadata.getFilename())
                    .withFileSizeInBytes(fileMetadata.getFileSize())
                    .withRecordCount(fileMetadata.getRowCount())
                    .withFormat(FileFormat.AVRO)
                    .build());
        }
        appendFiles.commit();
    }
apache-spark google-cloud-storage spark-avro apache-iceberg
1个回答
0
投票

以下两件事解决了我的问题

  • 确保我为冰山表提供了正确的路径名(在我的例子中带有 gs:// 前缀)

  • 解决了apache.avro依赖版本冲突

© www.soinside.com 2019 - 2024. All rights reserved.