我对Kafka和流媒体相当陌生。我有一个要求,就像每次运行kafka生产者和消费者时,我都应该获得生产者产生的唯一消息。
下面是生产者和消费者的基本代码
制作人
val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](props) val record = new ProducerRecord[String, String]("test", "key", jsonstring) producer.send(record) producer.close()
消费者
val props = new Properties() props.put("bootstrap.servers", "localhost:9092") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("auto.offset.reset", "earliest") props.put("group.id", "13") val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props) consumer.subscribe(util.Arrays.asList("test")) while (true) { val record = consumer.poll(1000).asScala for (data <- record.iterator){ println(data.value()) }
我正在使用的输入Json在下面
{
“ id”:1,
“名称”:“ foo”
}
现在我面临的问题是每次运行程序时我都会得到重复的值。例如,如果我两次运行代码,则消费者输出看起来像这样
{
“ id”:1,
“名称”:“ foo”
}
{
“ id”:1,
“名称”:“ foo”
}
我想要输出,就像我运行程序一样,应该消耗并打印生产者处理的唯一消息。
我尝试了一些事情,例如将消费者属性更改为最新的偏移量
props.put("auto.offset.reset", "latest")
我也尝试过以下提到的事情,但对我没有用How can I get the LATEST offset of a kafka topic?
您能建议其他选择吗?
我对Kafka和流媒体相当陌生。我有一个要求,就像每次运行kafka生产者和消费者时,我都应该获得生产者产生的唯一消息。以下是...
Consumer