KafkaConsumer读取所有记录

问题描述 投票:0回答:2

我想测试一个kafka的例子: 制作人:

object ProducerApp extends App {

val topic = "topicTest"
val  props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
for(i <- 0 to 20)
{    
val record = new ProducerRecord(topic, "key "+i," value "+i)    
producer.send(record)    
Thread.sleep(100)    
}
}

消费者:

object ConsumerApp extends App {
val topic = "topicTest"  
val properties = new Properties
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer")  
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")  
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val consumer = new KafkaConsumer[String, String](properties)  
consumer.subscribe(scala.List(topic).asJava)    
while (true) {
val records:ConsumerRecords[String,String] = consumer.poll(200)
println("records size "+records.count())    
}  
}

主题“topicTest”创建有 1 个分区。

预期结果是:

records size 21
records size 21
records size 21
records size 21
...

但是得到的结果是:

records size 21
records size 0
records size 21
records size 0
records size 21
records size 0
...

消费者交替读取记录。我想了解原因。 谢谢

apache-kafka kafka-consumer-api
2个回答
0
投票

在我看来,您看到此行为的原因可能是您为

poll()
设置的短超时。
200
毫秒的超时可能不足以在偏移重置后重试内部
pollOnce()
(link)。消费者在需要重置偏移量且不返回任何记录的
pollOnce()
后超时。在您的应用程序的下一个循环中,当调用
poll()
时,偏移量已经是我们需要的。所以
200
毫秒可能足以检索记录。在第三次调用时重复上述行为。

请注意,对

seekToBeginning()
的调用不会立即重置偏移量,而是标记分区以进行偏移量重置。实际重置发生在下一次调用
poll()
position()
.

尝试增加

poll()
超时,这应该有望摆脱您得到的
record size 0
输出。


0
投票

Kafka 不会将数据存储在连续的偏移量中。一些偏移量留空。对于这些偏移量,您将检索一个空数组。这是正常的,只是拒绝那些抵消。

© www.soinside.com 2019 - 2024. All rights reserved.