当我使用FileSink从kafka源保存数据时,文件无法从inprogress状态转换为finished状态

问题描述 投票:0回答:1

当我使用FileSink从kafka源保存数据时,文件无法从inprogress状态转换为finished状态,但是如果我用随机生成的流数据源替换数据源,则不会发生这种情况。

package org.SCAU.fileSink;
import org.SCAU.SerializerDeserializer.socialStockSerializerDeserializer;
import org.SCAU.model.socialMediaStocks2;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class exampleFromZhiHu {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);


        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(10));
        
        String intTopic = "test";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.199.165:5092");
        properties.setProperty("group.id", "test");
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<socialMediaStocks2>(
                intTopic, new socialStockSerializerDeserializer(), properties
        );
        consumer.setStartFromLatest();
        DataStream<socialMediaStocks2> inputDataStream = env.addSource(consumer);

        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix")
                .withPartSuffix(".txt")
                .build();

        FileSink<socialMediaStocks2> fileSink = FileSink
                .forRowFormat(
                        new Path("output/fileSinkTest"),
                        new SimpleStringEncoder<socialMediaStocks2>("UTF-8") ) //

                .withBucketAssigner(new DateTimeBucketAssigner<>())

                .withRollingPolicy(

                        DefaultRollingPolicy
                                .builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(2)) 
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(1))
                                .withMaxPartSize(1024 * 1024 * 1024) 
                                .build()
                )
                .withOutputFileConfig(config)
                .build();


        inputDataStream.sinkTo(fileSink);

        env.execute();
    } }

尝试过:

  1. 更改RollingPolicy的参数,
  2. 更改并行度,
  3. 更改检查点间隔

期望:保存文件的最后状态已完成

apache-kafka apache-flink flink-streaming
1个回答
0
投票

您正在使用旧的

FlinkKafkaConsumer
源。切换到较新的
KafkaSource
,并提供水印生成器。

© www.soinside.com 2019 - 2024. All rights reserved.