我正在使用一个值+记录 Kafka 生产者:
bin/kafka-console-producer.sh --topic test3 --property "parse.key=true" --property "key.separator=:" --bootstrap-server localhost:9092
但我发现很难理解如何使用 Flink Kafka 消费者读取这些 Kafka 记录
KafkaSource
。我希望能够做这样的事情:
record.getValue(), record.getKey(), record.getTimestamp()...
这是我当前的代码,仅从 Kafka 读取非键控记录
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(ip)
.setTopics("test3")
.setGroupId("1")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
stream.map((MapFunction<String, String>) value -> "Receiving from Kafka : " + value).print();
我可以获取我正在寻找的示例吗?
您需要实现一个
KafkaRecordDeserializationSchema
(但不是valueOnly),然后在其反序列化方法中您将可以访问ConsumerRecord,并且您可以使用它的键、值、标头等来生成您想要的任何类型。
阅读 Apache Kafka® 标头 中有一个示例,它是 Immerok Apache Flink Cookbook 的一部分。请注意,虽然该示例从记录的标头访问主题、分区、偏移量和时间戳,但它不使用密钥,该密钥可用作
record.key()
。
注:我为 Immerok 工作。