Confluence S3 Sink 连接器:使用带有压缩功能的 Parquet 格式类时出错

问题描述 投票:0回答:0

我正在使用 s3 连接器从 kafka avro 主题(带有架构注册表)读取并以 parquet 格式推送到 s3。当没有压缩时它工作得很好。但是当我使用 gzip 压缩时,出现以下错误:

{"source_host":"connector-avro-parquet-0","method":"put","level":"ERROR","ctx":{"stacktrace":"org.apache.kafka.connect.errors.RetriableException: org.apache.kafka.connect.errors.DataException: Multipart upload failed to complete.\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:524)\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitOnTimeIfNoData(TopicPartitionWriter.java:303)\n\tat io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:194)\n\tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:191)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)\n\tat java.base\/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base\/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base\/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base\/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.DataException: Multipart upload failed to complete.\n\tat io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:169)\n\tat io.confluent.connect.s3.storage.S3ParquetOutputStream.close(S3ParquetOutputStream.java:36)\n\tat org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:865)\n\tat org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)\n\tat org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:310)\n\tat io.confluent.connect.s3.format.parquet.ParquetRecordWriterProvider$1.commit(ParquetRecordWriterProvider.java:108)\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitFile(TopicPartitionWriter.java:544)\n\tat io.confluent.connect.s3.TopicPartitionWriter.commitFiles(TopicPartitionWriter.java:514)\n\t... 14 more\nCaused by: org.apache.kafka.connect.errors.ConnectException: Expected compressionFilter to be a DeflaterOutputStream, but was passed an instance that does not match that type.\n\tat io.confluent.connect.s3.storage.CompressionType$1.finalize(CompressionType.java:77)\n\tat io.confluent.connect.s3.storage.S3OutputStream.commit(S3OutputStream.java:161)\n\t... 21 more","exception_class":"org.apache.kafka.connect.errors.RetriableException","exception_message":"org.apache.kafka.connect.errors.DataException: Multipart upload failed to complete."}

当我使用带有压缩功能的 Avro 格式类时,也会发生同样的问题

连接器配置:

{
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "errors.log.include.messages": "true",
    "s3.region": "us-east-1",
    "topics.dir": "parquet-parsed-data",
    "flush.size": "10000000",
    "s3.part.size": "62428800",
    "tasks.max": "10",
    "timezone": "UTC",
    "rotate.interval.ms": "-1",
    "locale": "en-US",
    "partition.fieldName": "day",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "format.bytearray.extension": "json",
    "errors.log.enable": "true",
    "s3.bucket.name": "data-lake-parquet",
    "partition.duration.ms": "6000000",
    "s3.ssea.name": "",
    "schema.compatibility": "NONE",
    "directory.delim": "/",
    "batch.size": "1000",
    "parquet.codec": "gzip",
    "store.url": "http://127.0.0.1:9000",
    "topics.regex": "input-topic-avro",
    "partition.field.name": "timestamp",
    "s3.compression.type": "gzip",
    "partitioner.class": "custom.WriterPartitioner",
    "name": "avro-parquet-writer",
    "partition.fields": "timestamp:no_timestamp",
    "errors.tolerance": "all",
    "connector.buffer.dir": "/data/",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "path.format": "YYYY-MM-dd",
    "rotate.schedule.interval.ms": "300000",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "timestamp"
}

有办法解决这个问题吗?

提前致谢!

我正在尝试什么以及我期望什么: 从启用架构注册表的 kafka 主题读取 avro 序列化消息,并使用合流 S3 接收器连接器以压缩 parquet 格式推送到 aws s3。连接器正在使用来自主题的消息,但失败并出现“分段上传失败”错误。期望在 s3 存储桶中看到压缩的 parquet 文件 (*.parquet.gzip)

amazon-s3 apache-kafka apache-kafka-connect s3-kafka-connector
© www.soinside.com 2019 - 2024. All rights reserved.