StreamingFileSink批量编写器在AWS EMR中运行时会导致某些检查点错误

问题描述 投票:2回答:1

无法使用StreamingFileSink并以压缩方式存储传入事件。

我正在尝试使用StreamingFileSink将无限事件流写入S3。在此过程中,我想压缩数据以更好地利用可用的存储大小。

我通过从flink借用SequenceFileWriterFactory的一些代码来编写了压缩的字符串编写器。它会失败,但我将在下面描述。

如果我尝试使用BucketingSink,效果很好。使用BucketingSink,我按如下方法处理压缩的字符串。同样,我从其他请求请求中借用了此代码。

import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;

import java.io.IOException;

public class CompressionStringWriter<T> extends StreamWriterBase<T> implements Writer<T> {

    private static final long serialVersionUID = 3231207311080446279L;

    private String codecName;

    private String separator;

    public String getCodecName() {
        return codecName;
    }

    public String getSeparator() {
        return separator;
    }

    private transient CompressionOutputStream compressedOutputStream;

    public CompressionStringWriter(String codecName, String separator) {
        this.codecName = codecName;
        this.separator = separator;
    }

    public CompressionStringWriter(String codecName) {
        this(codecName, System.lineSeparator());
    }

    protected CompressionStringWriter(CompressionStringWriter<T> other) {
        super(other);
        this.codecName = other.codecName;
        this.separator = other.separator;
    }

    @Override
    public void open(FileSystem fs, Path path) throws IOException {
        super.open(fs, path);
        Configuration conf = fs.getConf();
        CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf);
        CompressionCodec codec = codecFactory.getCodecByName(codecName);
        if (codec == null) {
            throw new RuntimeException("Codec " + codecName + " not found");
        }
        Compressor compressor = CodecPool.getCompressor(codec, conf);
        compressedOutputStream = codec.createOutputStream(getStream(), compressor);

    }

    @Override
    public void close() throws IOException {
        if (compressedOutputStream != null) {
            compressedOutputStream.close();
            compressedOutputStream = null;
        } else {
            super.close();
        }
    }

    @Override
    public void write(Object element) throws IOException {
        getStream();
        compressedOutputStream.write(element.toString().getBytes());
        compressedOutputStream.write(this.separator.getBytes());
    }

    @Override
    public CompressionStringWriter<T> duplicate() {
        return new CompressionStringWriter<>(this);
    }
}
BucketingSink<DeviceEvent> bucketingSink = new BucketingSink<>("s3://"+ this.bucketName + "/" + this.objectPrefix);
        bucketingSink
                .setBucketer(new OrgIdBasedBucketAssigner())
                .setWriter(new CompressionStringWriter<DeviceEvent>("Gzip", "\n"))
                .setPartPrefix("file-")
                .setPartSuffix(".gz")
                .setBatchSize(1_500_000);

带有BucketingSink的那个起作用。

现在我使用StreamingFileSink的代码段涉及以下代码集。

import org.apache.flink.api.common.serialization.BulkWriter;

import java.io.IOException;

public class CompressedStringBulkWriter<T> implements BulkWriter<T> {

    private final CompressedStringWriter compressedStringWriter;

    public CompressedStringBulkWriter(final CompressedStringWriter compressedStringWriter) {
        this.compressedStringWriter = compressedStringWriter;
    }

    @Override
    public void addElement(T element) throws IOException {
        this.compressedStringWriter.write(element);
    }

    @Override
    public void flush() throws IOException {
        this.compressedStringWriter.flush();
    }

    @Override
    public void finish() throws IOException {
        this.compressedStringWriter.close();
    }
}
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

public class CompressedStringBulkWriterFactory<T> implements BulkWriter.Factory<T> {

    private SerializableHadoopConfiguration serializableHadoopConfiguration;

    public CompressedStringBulkWriterFactory(final Configuration hadoopConfiguration) {
        this.serializableHadoopConfiguration = new SerializableHadoopConfiguration(hadoopConfiguration);
    }

    @Override
    public BulkWriter<T> create(FSDataOutputStream out) throws IOException {
        return new CompressedStringBulkWriter(new CompressedStringWriter(out, serializableHadoopConfiguration.get(), "Gzip", "\n"));
    }
}
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.util.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;

public class CompressedStringWriter<T> implements Serializable {

    private static final Logger LOG = LoggerFactory.getLogger(CompressedStringWriter.class);

    private static final long serialVersionUID = 2115292142239557448L;


    private String separator;

    private transient CompressionOutputStream compressedOutputStream;

    public CompressedStringWriter(FSDataOutputStream out, Configuration hadoopConfiguration, String codecName, String separator) {
        this.separator = separator;
        try {
            Preconditions.checkNotNull(hadoopConfiguration, "Unable to determine hadoop configuration using path");
            CompressionCodecFactory codecFactory = new CompressionCodecFactory(hadoopConfiguration);
            CompressionCodec codec = codecFactory.getCodecByName(codecName);
            Preconditions.checkNotNull(codec, "Codec " + codecName + " not found");
            LOG.info("The codec name that was loaded from hadoop {}", codec);
            Compressor compressor = CodecPool.getCompressor(codec, hadoopConfiguration);
            this.compressedOutputStream = codec.createOutputStream(out, compressor);
            LOG.info("Setup a compressor for codec {} and compressor {}", codec, compressor);
        } catch (IOException ex) {
            throw new RuntimeException("Unable to compose a hadoop compressor for the path", ex);
        }
    }

    public void flush() throws IOException {
        if (compressedOutputStream != null) {
            compressedOutputStream.flush();
        }
    }

    public void close() throws IOException {
        if (compressedOutputStream != null) {
            compressedOutputStream.close();
            compressedOutputStream = null;
        }
    }

    public void write(T element) throws IOException {
        compressedOutputStream.write(element.toString().getBytes());
        compressedOutputStream.write(this.separator.getBytes());
    }
}
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

public class SerializableHadoopConfiguration implements Serializable {

    private static final long serialVersionUID = -1960900291123078166L;

    private transient Configuration hadoopConfig;

    SerializableHadoopConfiguration(Configuration hadoopConfig) {
        this.hadoopConfig = hadoopConfig;
    }

    Configuration get() {
        return this.hadoopConfig;
    }

    // --------------------
    private void writeObject(ObjectOutputStream out) throws IOException {
        this.hadoopConfig.write(out);
    }

    private void readObject(ObjectInputStream in) throws IOException {
        final Configuration config = new Configuration();
        config.readFields(in);

        if (this.hadoopConfig == null) {
            this.hadoopConfig = config;
        }
    }
}

我的实际flink工作

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties kinesisConsumerConfig = new Properties();
...
...
DataStream<DeviceEvent> kinesis =
                env.addSource(new FlinkKinesisConsumer<>(this.streamName, new DeviceEventSchema(), kinesisConsumerConfig)).name("source")
                .setParallelism(16)
                .setMaxParallelism(24);

final StreamingFileSink<DeviceEvent> bulkCompressStreamingFileSink = StreamingFileSink.<DeviceEvent>forBulkFormat(
                path,
                new CompressedStringBulkWriterFactory<>(
                        BucketingSink.createHadoopFileSystem(
                                new Path("s3a://"+ this.bucketName + "/" + this.objectPrefix),
                                null).getConf()))
                .withBucketAssigner(new OrgIdBucketAssigner())
                .build();

deviceEventDataStream.addSink(bulkCompressStreamingFileSink).name("bulkCompressStreamingFileSink").setParallelism(16);

env.execute();

我希望将数据作为多个文件保存在S3中。不幸的是,没有文件被创建。

在日志中,我看到以下异常

2019-05-15 22:17:20,855 INFO  org.apache.flink.runtime.taskmanager.Task                     - Sink: bulkCompressStreamingFileSink (11/16) (c73684c10bb799a6e0217b6795571e22) switched from RUNNING to FAILED.
java.lang.Exception: Could not perform checkpoint 1 for operator Sink: bulkCompressStreamingFileSink (11/16).
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
    at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209)
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Sink: bulkCompressStreamingFileSink (11/16).
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
    ... 8 more
Caused by: java.io.IOException: Stream closed.
    at org.apache.flink.fs.s3.common.utils.RefCountedFile.requireOpened(RefCountedFile.java:117)
    at org.apache.flink.fs.s3.common.utils.RefCountedFile.write(RefCountedFile.java:74)
    at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:105)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:199)
    at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:166)
    at org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71)
    at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244)
    at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235)
    at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)

所以想知道,我想念的是什么。

我正在使用最新的AWS EMR(5.23)。

无法使用StreamingFileSink并以压缩方式存储传入事件。我正在尝试使用StreamingFileSink将无限事件流写入S3。在此过程中,我想压缩...

java apache-flink amazon-emr flink-streaming
1个回答
0
投票

CompressedStringBulkWriter#close()中,您正在close()上调用CompressionCodecStream方法,该方法也关闭了底层流,即Flink的FSDataOutputStream。必须打开它,Flink内部才能正确完成检查点,以确保流可恢复。这就是为什么你得到

© www.soinside.com 2019 - 2024. All rights reserved.