根据条件暂停和恢复 ReactiveKafkaReceiver

问题描述 投票:0回答:1

我正在尝试为 kafka 消费者实现一个带有 resilence4j 的断路器,该消费者又调用下游 API。 kafka 消费者和 API 都是响应式的。

要求:应该有一个断路器,以防下游API抛出异常,并且如果电路打开,kafka消费者应该暂停。一旦电路回到关闭状态,消费者应该恢复。

这是我的消费者:

@EventListener(ApplicationStartedEvent.class) 公共无效消费(){

    reactiveKafkaReceiver
            .doOnNext(record -> {
                log.info("Reading offset {}, message:{}", record.offset(),record.value());
            })
            .concatMap(r ->
                this.processRecord(r)
                        .doOnSuccess(event -> r.receiverOffset().acknowledge())
                        .doOnError(error -> log.error("Error processing consumer events: {} ", error.getMessage()))
                        .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true))
                        .onErrorResume(error -> {
                            log.error("Retries exhausted for " + r.value());
                            //deadLetterPublishingRecoverer.accept(r, new Exception(error));
                            //r.receiverOffset().acknowledge();
                            return Mono.empty();
                        }))
            .repeat()
            .subscribe();
}

我已经能够在 processRecord() 方法上实现断路器。问题是,我无法根据断路器事件暂停和恢复 kafka 消费者。

我已经能够使用 KafkaListenerEndpointRegistry 和 messageListenerContainers 在非反应式 kafka 中做到这一点。我遇到了反应式卡夫卡的障碍。任何帮助将不胜感激。谢谢!

spring-kafka resilience4j reactive-kafka
1个回答
0
投票

我看到您提取了

reactiveKafkaReceiver
变量。因此,要对其
Consumer
执行暂停,您应该使用此 API:

    receiver.doOnConsumer(consumer -> {
        consumer.pause(...);
        return null;
    })

请参阅

doOnConsumer
Javadocs 中的更多信息:

/**
 * Invokes the specified function on the Kafka {@link Consumer} associated with this {@link KafkaReceiver}.
 * The function is scheduled when the returned {@link Mono} is subscribed to. The function is
 * executed on the thread used for other consumer operations to ensure that {@link Consumer}
 * is never accessed concurrently from multiple threads.
 * <p>
 * Example usage:
 * <pre>
 * {@code
 *     receiver.doOnConsumer(consumer -> consumer.partitionsFor(topic))
 *             .doOnSuccess(partitions -> System.out.println("Partitions " + partitions));
 * }
 * </pre>
 * Functions that are directly supported through the reactive {@link KafkaReceiver} interface
 * like <code>poll</code> and <code>commit</code> should not be invoked from <code>function</code>.
 * The methods supported by <code>doOnConsumer</code> are:
 * <ul>
 *   <li>{@link Consumer#assignment()}
 *   <li>{@link Consumer#subscription()}
 *   <li>{@link Consumer#seek(org.apache.kafka.common.TopicPartition, long)}
 *   <li>{@link Consumer#seekToBeginning(java.util.Collection)}
 *   <li>{@link Consumer#seekToEnd(java.util.Collection)}
 *   <li>{@link Consumer#position(org.apache.kafka.common.TopicPartition)}
 *   <li>{@link Consumer#committed(org.apache.kafka.common.TopicPartition)}
 *   <li>{@link Consumer#metrics()}
 *   <li>{@link Consumer#partitionsFor(String)}
 *   <li>{@link Consumer#listTopics()}
 *   <li>{@link Consumer#paused()}
 *   <li>{@link Consumer#pause(java.util.Collection)}
 *   <li>{@link Consumer#resume(java.util.Collection)}
 *   <li>{@link Consumer#offsetsForTimes(java.util.Map)}
 *   <li>{@link Consumer#beginningOffsets(java.util.Collection)}
 *   <li>{@link Consumer#endOffsets(java.util.Collection)}
 * </ul>
 *
 * @param function A function that takes Kafka {@link Consumer} as parameter
 * @return Mono that completes with the value returned by <code>function</code>
 */
<T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function);
© www.soinside.com 2019 - 2024. All rights reserved.