Kafka Receiver 在 Kafka 重启后无法消费来自主题的消息

问题描述 投票:0回答:1

最初,Kafka 接收器按预期工作,但是一旦 Kafka 代理重新启动并返回,同一个接收器就无法接收消息,日志中除了 warn 之外没有看到任何错误

    ReceiverOptions<String, byte[]> receiverConfig = getReceiverConfig(topic);
    pipelineStatusKafkaReceiver = KafkaReceiver.create(
        receiverConfig.subscription(Collections.singleton(topic)));
    pipelineStatusKafkaReceiver.receive()
        .retryWhen(Retry.backoff(300, Duration.of(10L, ChronoUnit.SECONDS)))
        .doOnError(throwable -> LOGGER.error("Error from retry"))
        .subscribe(
            receiverRecord -> Mono.just("Kafka: Listening Pipeline status on {}")
                .doOnEach(logOnNext(s -> LOGGER.info(s, topic)))
                .flatMap(s -> updatePipelineStatus(receiverRecord))
                .doOnSuccess(receiveRecord -> `receiverRecord.receiverOffset().acknowledge())`
                .subscribe());
apache-kafka spring-webflux kafka-consumer-api reactor-kafka
1个回答
0
投票

此实现将在发生错误时递归调用

subscribeAndRetryOnError
,在断开连接时有效地重新启动订阅。
Disposable
将帮助管理订阅生命周期以防止资源泄漏。

private Disposable disposable;

public void subscribeAndUpdatePipelineStatues(String topic) {
    ReceiverOptions<String, byte[]> receiverConfig = getReceiverConfig(topic);
    pipelineStatusKafkaReceiver = KafkaReceiver.create(receiverConfig.subscription(Collections.singleton(topic)));
    disposable = subscribeAndRetryOnError(topic);
}

private Disposable subscribeAndRetryOnError(String topic) {
    return pipelineStatusKafkaReceiver.receive()
        .concatMap(receiverRecord ->
            Mono.just("Kafka: Listening Pipeline status on {}")
                .doOnNext(s -> LOGGER.info(s, topic))
                .then(updatePipelineStatus(receiverRecord))
                .doOnSuccess(v -> receiverRecord.receiverOffset().acknowledge())
                .onErrorResume(throwable -> {
                    LOGGER.error("Error processing record", throwable);
                    return Mono.empty();
                })
        )
        .doOnError(throwable -> {
            LOGGER.error("Receiver terminated with error, restarting...", throwable);
            disposable.dispose();
            disposable = subscribeAndRetryOnError(topic);
        })
        .subscribe();
}
© www.soinside.com 2019 - 2024. All rights reserved.