setGroupId 在 flink 流作业中不起作用

问题描述 投票:0回答:1

我使用以下命令在笔记本电脑上运行 Flink :

./bin/start-cluster.sh

我正在编写一个简单的 Flink 作业来从一个主题读取数据并为另一个主题生成相同的数据。

public class ThirdJob {
public static void main(String[] args) throws Exception {
    String kafkaTopicSink = "test10";
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
            .setBootstrapServers("localhost:9092")
            .setTopics("test9")
            .setGroupId("test160")
            .setClientIdPrefix("x_x_x_x")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .build();



    DataStreamSource<String> myStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"read_from_kafka");

    DataStream<Person> objectStream= myStream.map(new PersonMapper()).name("Map_each_record_to_the_person_object");
    objectStream.print().name("Print_the_kafka_message");

    KafkaSink<Person> myKafkaSink = KafkaSink.<Person>builder()
            .setBootstrapServers("localhost:9092")
            .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                    .setTopic(kafkaTopicSink)
                    .setValueSerializationSchema(new PersonSerializationSchema())
                    .build()
            )
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();


    objectStream.sinkTo(myKafkaSink).name("write_data_to_the_kafka");

    env.execute();

}

}

工作正常,但当我列出消费者组时,我没有在列表中看到我的组 ID 我正在使用此命令来显示消费者组列表:

./kafka-consumer-groups  --list --bootstrap-server localhost:9092

Flink 使用 Consumer Group 吗?或者我需要进行其他配置吗?

apache-flink flink-streaming
1个回答
0
投票

您在作业中启用了检查点吗?

如果没有,您可能会想要它。 检查点是 Flink 的主要容错机制,也是某些操作(例如提交 Kafka 偏移量)如何发挥作用的基石。 Flink 将在幕后管理适当的偏移量并将其存储在内部,而不是仅仅依赖 Kafka 消费者组等。

您可以通过

enableCheckpointing()
功能添加此功能,如下所示:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing at some defined interval
env.enableCheckpointing(5000); 
© www.soinside.com 2019 - 2024. All rights reserved.