如何从kafka流中获取最新值

问题描述 投票:0回答:1

我对Kafka和流媒体相当陌生。我有一个要求,就像每次运行kafka生产者和消费者时,我都应该获得生产者产生的唯一消息。

下面是生产者和消费者的基本代码

制作人

 val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("test", "key", jsonstring)
    producer.send(record)
    producer.close()

消费者

val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "earliest")
    props.put("group.id", "13")
    val consumer: KafkaConsumer[String, Map[String,Any]] = new KafkaConsumer[String, Map[String,Any]](props)
    consumer.subscribe(util.Arrays.asList("test"))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator){
        println(data.value())

      }

我正在使用的输入Json在下面

{

“ id”:1,

“名称”:“ foo”

}

现在我面临的问题是每次运行程序时我都会得到重复的值。例如,如果我两次运行代码,则消费者输出看起来像这样

{

“ id”:1,

“名称”:“ foo”

}

{

“ id”:1,

“名称”:“ foo”

}

我想要输出,就像我运行程序一样,应该消耗并打印生产者处理的唯一消息。

我尝试了一些事情,例如将消费者属性更改为最新的偏移量

props.put("auto.offset.reset", "latest")

我也尝试过以下提到的事情,但对我没有用How can I get the LATEST offset of a kafka topic?

您能建议其他选择吗?

我对Kafka和流媒体相当陌生。我有一个要求,就像每次运行kafka生产者和消费者时,我都应该获得生产者产生的唯一消息。以下是...

scala apache-kafka streaming kafka-consumer-api
1个回答
0
投票

Consumer

© www.soinside.com 2019 - 2024. All rights reserved.