使用 reactor kafka 消费时丢失的事件

问题描述 投票:0回答:0

我正在使用 Reactive kafka 来消费事件。 问题:我向队列中推送了 7 个事件,但消费者只消费了其中的 5 个。 (仅在部署在服务器上时发生,在本地环境中运行良好)。这种情况发生了很多次,我们无法弄清楚这里的原因是什么。我是反应式编程的新手,请提出更好的代码实践。

    @PostConstruct
    List<KafkaReceiver<String, String>> kafkaReceiverList = new ArrayList<>();
    for (int i = 0; i < 5; i++) {
        kafkaReceiverList.add(KafkaReceiver.create());
    }

    @EventListener(ApplicationStartedEvent.class)
    for (KafkaReceiver<String, String> receiver : kafkaReceiverList) {
            kafkaReceivers.add(receiver
                    .receive()
                    .log()
                    .bufferTimeout(500,10)
                    .flatMap(this::processRecord) // input - List<ReceiverRecord<String, String>>
                    .flatMap(this::commitRecord) // input - List<ReceiverRecord<String, String>>
                    .subscribe());
        }

     public Flux<Void> commitRecord(List<ReceiverRecord<String, String>> records) {
        log.info(InfoMessageConstants.COMMIT_RECORD, records);
        records.forEach(record -> record.receiverOffset().commit().subscribe());
        return Flux.empty();
     }
    
     @PreDestroy
     kafkaReceiverList.forEach(consumer -> {
            try {
                kafkaReceivers.stream()
                        .forEach(Disposable::dispose);
            } catch (Exception ex) {
                log.error("Error closing consumer: ", ex);
            }
        });

Why creating a list of receiver?

在partitions的基础上创建consumer,对consumer和partitions的数量分别有更多的控制

Is it reproducible in local environment?

没有

I am looking for reason, why some events are lost when I start service with this consumer. 
Steps to reproduce on a server:
1. Stop consumer/Service
2. Push events to topic
3. Start Consumer. 
reactive-programming spring-kafka project-reactor reactive reactor-kafka
© www.soinside.com 2019 - 2024. All rights reserved.