当我使用FileSink从kafka源保存数据时,文件无法从inprogress状态转换为finished状态,但是如果我用随机生成的流数据源替换数据源,则不会发生这种情况。
package org.SCAU.fileSink;
import org.SCAU.SerializerDeserializer.socialStockSerializerDeserializer;
import org.SCAU.model.socialMediaStocks2;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class exampleFromZhiHu {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
String intTopic = "test";
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.199.165:5092");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<socialMediaStocks2>(
intTopic, new socialStockSerializerDeserializer(), properties
);
consumer.setStartFromLatest();
DataStream<socialMediaStocks2> inputDataStream = env.addSource(consumer);
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".txt")
.build();
FileSink<socialMediaStocks2> fileSink = FileSink
.forRowFormat(
new Path("output/fileSinkTest"),
new SimpleStringEncoder<socialMediaStocks2>("UTF-8") ) //
.withBucketAssigner(new DateTimeBucketAssigner<>())
.withRollingPolicy(
DefaultRollingPolicy
.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(2))
.withInactivityInterval(TimeUnit.SECONDS.toMillis(1))
.withMaxPartSize(1024 * 1024 * 1024)
.build()
)
.withOutputFileConfig(config)
.build();
inputDataStream.sinkTo(fileSink);
env.execute();
} }
尝试过:
期望:保存文件的最后状态已完成
您正在使用旧的
FlinkKafkaConsumer
源。切换到较新的KafkaSource
,并提供水印生成器。