我使用以下命令在笔记本电脑上运行 Flink :
./bin/start-cluster.sh
我正在编写一个简单的 Flink 作业来从一个主题读取数据并为另一个主题生成相同的数据。
public class ThirdJob {
public static void main(String[] args) throws Exception {
String kafkaTopicSink = "test10";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test9")
.setGroupId("test160")
.setClientIdPrefix("x_x_x_x")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> myStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"read_from_kafka");
DataStream<Person> objectStream= myStream.map(new PersonMapper()).name("Map_each_record_to_the_person_object");
objectStream.print().name("Print_the_kafka_message");
KafkaSink<Person> myKafkaSink = KafkaSink.<Person>builder()
.setBootstrapServers("localhost:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(kafkaTopicSink)
.setValueSerializationSchema(new PersonSerializationSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
objectStream.sinkTo(myKafkaSink).name("write_data_to_the_kafka");
env.execute();
}
}
工作正常,但当我列出消费者组时,我没有在列表中看到我的组 ID 我正在使用此命令来显示消费者组列表:
./kafka-consumer-groups --list --bootstrap-server localhost:9092
Flink 使用 Consumer Group 吗?或者我需要进行其他配置吗?
您在作业中启用了检查点吗?
如果没有,您可能会想要它。 检查点是 Flink 的主要容错机制,也是某些操作(例如提交 Kafka 偏移量)如何发挥作用的基石。 Flink 将在幕后管理适当的偏移量并将其存储在内部,而不是仅仅依赖 Kafka 消费者组等。
您可以通过
enableCheckpointing()
功能添加此功能,如下所示:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing at some defined interval
env.enableCheckpointing(5000);