Kafka消费者偏移提交问题

问题描述 投票:0回答:1

对于使用 Reactor Kafka 的 Spring WebFlux,我为我的消费者提供了以下代码:

public EventConsumer(KafkaReceiver<String, String> inputEventReceiver,
                     MessageHelper messageHelper,
                     List<EventProcessor> eventProcessors) {
    this.inputEventReceiver = inputEventReceiver;
    this.messageHelper = messageHelper;
    this.eventProcessorChain = new EventProcessorChain(eventProcessors);
}

public Disposable consumeMessage() {
    return processRecord()
            .onBackpressureBuffer(BUFFER_SIZE)
            .limitRate(MAX_BATCH_SIZE)
            .retryWhen(Retry.indefinitely())
            .subscribe(record -> {}, error -> log.error("error while consuming event with message {}", error.getMessage()));
}

public Flux<EventWrapper> processRecord() {
    Flux<ReceiverRecord<String, String>> receiverRecord = Flux.defer(inputEventReceiver::receive);
    return receiverRecord.flatMap(this::processMessage);
}

private Flux<EventWrapper> processMessage(final ReceiverRecord<String, String> receiverRecord) {
    EventWrapper eventWrapper = messageHelper.fetchDataFromRecord(receiverRecord);
    return processEventThroughChain(eventWrapper, receiverRecord);
}

private Flux<EventWrapper> processEventThroughChain(EventWrapper eventWrapper, ReceiverRecord<String, String> receiverRecord) {
    return eventProcessorChain.processEvent(eventWrapper)
            .filter(EventWrapper::isValid)
            .doOnNext(result -> receiverRecord.receiverOffset().acknowledge()).flux();
}

MAX_BATCH_SIZE
的值我保留为500。问题是当这个消费者开始消费时,如果事件失败(在我的例子中isValid为假),我不会提交它的偏移量,因为我希望事件重试。

但是发生的情况是,在这批事件中,如果中间的事件失败并且没有提交,它仍然不会重试,因为同一批中此之后的事件被提交。

我能想到的唯一解决方案是有一个重试逻辑,但是如果我不想错过该事件,我必须继续重试。这将占用我的一个消费者。如果我在重试后继续消费事件,这个事件将会丢失,所以这就是为什么我暂时无限期地重试,并且目前需要手动干预该问题。

那么,有什么方法可以在 WebFlux 中处理这个问题吗?对我来说似乎正确的解决方案是使用 DLQ(死信队列),但除此之外是否可以在 Reactor 端处理它?

spring-webflux spring-kafka project-reactor
1个回答
0
投票

不会重试,因为您需要将方法包装在

Flux.defer(() -> processRecord())...
fromSuplier
中。您可以查看这篇文章了解更多详细信息:https://www.woolha.com/tutorials/project-reactor-using-retry-and-retrywhen-examples

© www.soinside.com 2019 - 2024. All rights reserved.