我正在使用 Reactive kafka 来消费事件。 问题:我向队列中推送了 7 个事件,但消费者只消费了其中的 5 个。 (仅在部署在服务器上时发生,在本地环境中运行良好)。这种情况发生了很多次,我们无法弄清楚这里的原因是什么。我是反应式编程的新手,请提出更好的代码实践。
@PostConstruct
List<KafkaReceiver<String, String>> kafkaReceiverList = new ArrayList<>();
for (int i = 0; i < 5; i++) {
kafkaReceiverList.add(KafkaReceiver.create());
}
@EventListener(ApplicationStartedEvent.class)
for (KafkaReceiver<String, String> receiver : kafkaReceiverList) {
kafkaReceivers.add(receiver
.receive()
.log()
.bufferTimeout(500,10)
.flatMap(this::processRecord) // input - List<ReceiverRecord<String, String>>
.flatMap(this::commitRecord) // input - List<ReceiverRecord<String, String>>
.subscribe());
}
public Flux<Void> commitRecord(List<ReceiverRecord<String, String>> records) {
log.info(InfoMessageConstants.COMMIT_RECORD, records);
records.forEach(record -> record.receiverOffset().commit().subscribe());
return Flux.empty();
}
@PreDestroy
kafkaReceiverList.forEach(consumer -> {
try {
kafkaReceivers.stream()
.forEach(Disposable::dispose);
} catch (Exception ex) {
log.error("Error closing consumer: ", ex);
}
});
Why creating a list of receiver?
在partitions的基础上创建consumer,对consumer和partitions的数量分别有更多的控制
Is it reproducible in local environment?
没有
I am looking for reason, why some events are lost when I start service with this consumer.
Steps to reproduce on a server:
1. Stop consumer/Service
2. Push events to topic
3. Start Consumer.