使用 apache Flink 读取带键的 Kafka 记录?

问题描述 投票:0回答:1

我正在使用一个值+记录 Kafka 生产者:

bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092

但我发现很难理解如何使用 Flink Kafka 消费者读取这些 Kafka 记录

KafkaSource
。我希望能够做这样的事情:

record.getValue(), record.getKey(), record.getTimestamp()...

这是我当前的代码,仅从 Kafka 读取非键控记录

KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(ip)
        .setTopics("test3")
        .setGroupId("1")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
        .build();

DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();

我可以获取我正在寻找的示例吗?

java apache-kafka apache-flink flink-streaming
1个回答
3
投票

您需要实现一个

KafkaRecordDeserializationSchema
(但不是valueOnly),然后在其反序列化方法中您将可以访问ConsumerRecord,并且您可以使用它的键、值、标头等来生成您想要的任何类型。

阅读 Apache Kafka® 标头 中有一个示例,它是 Immerok Apache Flink Cookbook 的一部分。请注意,虽然该示例从记录的标头访问主题、分区、偏移量和时间戳,但它不使用密钥,该密钥可用作

record.key()

注:我为 Immerok 工作。

© www.soinside.com 2019 - 2024. All rights reserved.