无法使用StreamingFileSink并以压缩方式存储传入事件。
我正在尝试使用StreamingFileSink将无限事件流写入S3。在此过程中,我想压缩数据以更好地利用可用的存储大小。
我通过从flink借用SequenceFileWriterFactory
的一些代码来编写了压缩的字符串编写器。它会失败,但我将在下面描述。
如果我尝试使用BucketingSink
,效果很好。使用BucketingSink,我按如下方法处理压缩的字符串。同样,我从其他请求请求中借用了此代码。
import org.apache.flink.streaming.connectors.fs.StreamWriterBase; import org.apache.flink.streaming.connectors.fs.Writer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import java.io.IOException; public class CompressionStringWriter<T> extends StreamWriterBase<T> implements Writer<T> { private static final long serialVersionUID = 3231207311080446279L; private String codecName; private String separator; public String getCodecName() { return codecName; } public String getSeparator() { return separator; } private transient CompressionOutputStream compressedOutputStream; public CompressionStringWriter(String codecName, String separator) { this.codecName = codecName; this.separator = separator; } public CompressionStringWriter(String codecName) { this(codecName, System.lineSeparator()); } protected CompressionStringWriter(CompressionStringWriter<T> other) { super(other); this.codecName = other.codecName; this.separator = other.separator; } @Override public void open(FileSystem fs, Path path) throws IOException { super.open(fs, path); Configuration conf = fs.getConf(); CompressionCodecFactory codecFactory = new CompressionCodecFactory(conf); CompressionCodec codec = codecFactory.getCodecByName(codecName); if (codec == null) { throw new RuntimeException("Codec " + codecName + " not found"); } Compressor compressor = CodecPool.getCompressor(codec, conf); compressedOutputStream = codec.createOutputStream(getStream(), compressor); } @Override public void close() throws IOException { if (compressedOutputStream != null) { compressedOutputStream.close(); compressedOutputStream = null; } else { super.close(); } } @Override public void write(Object element) throws IOException { getStream(); compressedOutputStream.write(element.toString().getBytes()); compressedOutputStream.write(this.separator.getBytes()); } @Override public CompressionStringWriter<T> duplicate() { return new CompressionStringWriter<>(this); } }
BucketingSink<DeviceEvent> bucketingSink = new BucketingSink<>("s3://"+ this.bucketName + "/" + this.objectPrefix); bucketingSink .setBucketer(new OrgIdBasedBucketAssigner()) .setWriter(new CompressionStringWriter<DeviceEvent>("Gzip", "\n")) .setPartPrefix("file-") .setPartSuffix(".gz") .setBatchSize(1_500_000);
带有
BucketingSink
的那个起作用。
现在我使用StreamingFileSink的代码段涉及以下代码集。
import org.apache.flink.api.common.serialization.BulkWriter; import java.io.IOException; public class CompressedStringBulkWriter<T> implements BulkWriter<T> { private final CompressedStringWriter compressedStringWriter; public CompressedStringBulkWriter(final CompressedStringWriter compressedStringWriter) { this.compressedStringWriter = compressedStringWriter; } @Override public void addElement(T element) throws IOException { this.compressedStringWriter.write(element); } @Override public void flush() throws IOException { this.compressedStringWriter.flush(); } @Override public void finish() throws IOException { this.compressedStringWriter.close(); } }
import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.hadoop.conf.Configuration; import java.io.IOException; public class CompressedStringBulkWriterFactory<T> implements BulkWriter.Factory<T> { private SerializableHadoopConfiguration serializableHadoopConfiguration; public CompressedStringBulkWriterFactory(final Configuration hadoopConfiguration) { this.serializableHadoopConfiguration = new SerializableHadoopConfiguration(hadoopConfiguration); } @Override public BulkWriter<T> create(FSDataOutputStream out) throws IOException { return new CompressedStringBulkWriter(new CompressedStringWriter(out, serializableHadoopConfiguration.get(), "Gzip", "\n")); } }
import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; import org.apache.flink.util.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Compressor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; public class CompressedStringWriter<T> implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CompressedStringWriter.class); private static final long serialVersionUID = 2115292142239557448L; private String separator; private transient CompressionOutputStream compressedOutputStream; public CompressedStringWriter(FSDataOutputStream out, Configuration hadoopConfiguration, String codecName, String separator) { this.separator = separator; try { Preconditions.checkNotNull(hadoopConfiguration, "Unable to determine hadoop configuration using path"); CompressionCodecFactory codecFactory = new CompressionCodecFactory(hadoopConfiguration); CompressionCodec codec = codecFactory.getCodecByName(codecName); Preconditions.checkNotNull(codec, "Codec " + codecName + " not found"); LOG.info("The codec name that was loaded from hadoop {}", codec); Compressor compressor = CodecPool.getCompressor(codec, hadoopConfiguration); this.compressedOutputStream = codec.createOutputStream(out, compressor); LOG.info("Setup a compressor for codec {} and compressor {}", codec, compressor); } catch (IOException ex) { throw new RuntimeException("Unable to compose a hadoop compressor for the path", ex); } } public void flush() throws IOException { if (compressedOutputStream != null) { compressedOutputStream.flush(); } } public void close() throws IOException { if (compressedOutputStream != null) { compressedOutputStream.close(); compressedOutputStream = null; } } public void write(T element) throws IOException { compressedOutputStream.write(element.toString().getBytes()); compressedOutputStream.write(this.separator.getBytes()); } }
import org.apache.hadoop.conf.Configuration; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; public class SerializableHadoopConfiguration implements Serializable { private static final long serialVersionUID = -1960900291123078166L; private transient Configuration hadoopConfig; SerializableHadoopConfiguration(Configuration hadoopConfig) { this.hadoopConfig = hadoopConfig; } Configuration get() { return this.hadoopConfig; } // -------------------- private void writeObject(ObjectOutputStream out) throws IOException { this.hadoopConfig.write(out); } private void readObject(ObjectInputStream in) throws IOException { final Configuration config = new Configuration(); config.readFields(in); if (this.hadoopConfig == null) { this.hadoopConfig = config; } } }
我的实际flink工作
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties kinesisConsumerConfig = new Properties(); ... ... DataStream<DeviceEvent> kinesis = env.addSource(new FlinkKinesisConsumer<>(this.streamName, new DeviceEventSchema(), kinesisConsumerConfig)).name("source") .setParallelism(16) .setMaxParallelism(24); final StreamingFileSink<DeviceEvent> bulkCompressStreamingFileSink = StreamingFileSink.<DeviceEvent>forBulkFormat( path, new CompressedStringBulkWriterFactory<>( BucketingSink.createHadoopFileSystem( new Path("s3a://"+ this.bucketName + "/" + this.objectPrefix), null).getConf())) .withBucketAssigner(new OrgIdBucketAssigner()) .build(); deviceEventDataStream.addSink(bulkCompressStreamingFileSink).name("bulkCompressStreamingFileSink").setParallelism(16); env.execute();
我希望将数据作为多个文件保存在S3中。不幸的是,没有文件被创建。
在日志中,我看到以下异常
2019-05-15 22:17:20,855 INFO org.apache.flink.runtime.taskmanager.Task - Sink: bulkCompressStreamingFileSink (11/16) (c73684c10bb799a6e0217b6795571e22) switched from RUNNING to FAILED. java.lang.Exception: Could not perform checkpoint 1 for operator Sink: bulkCompressStreamingFileSink (11/16). at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595) at org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396) at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not complete snapshot 1 for operator Sink: bulkCompressStreamingFileSink (11/16). at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113) at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055) at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729) at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641) at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586) ... 8 more Caused by: java.io.IOException: Stream closed. at org.apache.flink.fs.s3.common.utils.RefCountedFile.requireOpened(RefCountedFile.java:117) at org.apache.flink.fs.s3.common.utils.RefCountedFile.write(RefCountedFile.java:74) at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.flush(RefCountedBufferingFileStream.java:105) at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeAndUploadPart(S3RecoverableFsDataOutputStream.java:199) at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:166) at org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:63) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:244) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:235) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:347) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:395)
所以想知道,我想念的是什么。
我正在使用最新的AWS EMR(5.23)。
无法使用StreamingFileSink并以压缩方式存储传入事件。我正在尝试使用StreamingFileSink将无限事件流写入S3。在此过程中,我想压缩...
在CompressedStringBulkWriter#close()
中,您正在close()
上调用CompressionCodecStream
方法,该方法也关闭了底层流,即Flink的FSDataOutputStream
。必须打开它,Flink内部才能正确完成检查点,以确保流可恢复。这就是为什么你得到