[当我运行Apache Flink作为接收器流到AWS S3时,是标准版本(forRowFormat)工作正常。
StreamingFileSink<String> s3sink = StreamingFileSink
.forRowFormat(new Path(s3Url),
(String element, OutputStream stream) -> {
PrintStream out = new PrintStream(stream);
out.println(element);
})
.withBucketAssigner(new BucketAssigner())
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(100)
.withRolloverInterval(30000)
.build())
.withBucketCheckInterval(100)
.build();
当我使用批量格式和CompressWriterFactory运行相同的内容时>
StreamingFileSink<String> s3sink = StreamingFileSink .forBulkFormat(new Path(s3Url), new CompressWriterFactory(new DefaultExtractor())) .withOutputFileConfig(outputFileConfig) .build();
它给我下面的错误。
((注意-CompressWriterFactory可与HDFS方案'hdfs:// host:port / path')正常工作]
java.lang.UnsupportedOperationException: S3RecoverableFsDataOutputStream cannot sync state to S3. Use persist() to create a persistent recoverable intermediate point. at org.apache.flink.fs.s3.common.utils.RefCountedBufferingFileStream.sync(RefCountedBufferingFileStream.java:112) at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.sync(S3RecoverableFsDataOutputStream.java:126) at org.apache.flink.formats.compress.writers.NoCompressionBulkWriter.finish(NoCompressionBulkWriter.java:56) at org.apache.flink.streaming.api.functions.sink.filesystem.BulkPartWriter.closeForCommit(BulkPartWriter.java:62) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.prepareBucketForCheckpointing(Bucket.java:280) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onReceptionOfCheckpoint(Bucket.java:253) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotActiveBuckets(Buckets.java:250) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.snapshotState(Buckets.java:241) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.snapshotState(StreamingFileSink.java:422) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ... at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
注意-
[当我将Apache Flink作为接收器运行流传输到AWS S3时,标准版本(forRowFormat)可以正常工作。 StreamingFileSink
这似乎是一个错误。您可以使用此处包含的描述打开JIRA。