我正在尝试为 kafka 消费者实现一个带有 resilence4j 的断路器,该消费者又调用下游 API。 kafka 消费者和 API 都是响应式的。
要求:应该有一个断路器,以防下游API抛出异常,并且如果电路打开,kafka消费者应该暂停。一旦电路回到关闭状态,消费者应该恢复。
这是我的消费者:
@EventListener(ApplicationStartedEvent.class) 公共无效消费(){
reactiveKafkaReceiver
.doOnNext(record -> {
log.info("Reading offset {}, message:{}", record.offset(),record.value());
})
.concatMap(r ->
this.processRecord(r)
.doOnSuccess(event -> r.receiverOffset().acknowledge())
.doOnError(error -> log.error("Error processing consumer events: {} ", error.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true))
.onErrorResume(error -> {
log.error("Retries exhausted for " + r.value());
//deadLetterPublishingRecoverer.accept(r, new Exception(error));
//r.receiverOffset().acknowledge();
return Mono.empty();
}))
.repeat()
.subscribe();
}
我已经能够在 processRecord() 方法上实现断路器。问题是,我无法根据断路器事件暂停和恢复 kafka 消费者。
我已经能够使用 KafkaListenerEndpointRegistry 和 messageListenerContainers 在非反应式 kafka 中做到这一点。我遇到了反应式卡夫卡的障碍。任何帮助将不胜感激。谢谢!
我看到您提取了
reactiveKafkaReceiver
变量。因此,要对其 Consumer
执行暂停,您应该使用此 API:
receiver.doOnConsumer(consumer -> {
consumer.pause(...);
return null;
})
请参阅
doOnConsumer
Javadocs 中的更多信息:
/**
* Invokes the specified function on the Kafka {@link Consumer} associated with this {@link KafkaReceiver}.
* The function is scheduled when the returned {@link Mono} is subscribed to. The function is
* executed on the thread used for other consumer operations to ensure that {@link Consumer}
* is never accessed concurrently from multiple threads.
* <p>
* Example usage:
* <pre>
* {@code
* receiver.doOnConsumer(consumer -> consumer.partitionsFor(topic))
* .doOnSuccess(partitions -> System.out.println("Partitions " + partitions));
* }
* </pre>
* Functions that are directly supported through the reactive {@link KafkaReceiver} interface
* like <code>poll</code> and <code>commit</code> should not be invoked from <code>function</code>.
* The methods supported by <code>doOnConsumer</code> are:
* <ul>
* <li>{@link Consumer#assignment()}
* <li>{@link Consumer#subscription()}
* <li>{@link Consumer#seek(org.apache.kafka.common.TopicPartition, long)}
* <li>{@link Consumer#seekToBeginning(java.util.Collection)}
* <li>{@link Consumer#seekToEnd(java.util.Collection)}
* <li>{@link Consumer#position(org.apache.kafka.common.TopicPartition)}
* <li>{@link Consumer#committed(org.apache.kafka.common.TopicPartition)}
* <li>{@link Consumer#metrics()}
* <li>{@link Consumer#partitionsFor(String)}
* <li>{@link Consumer#listTopics()}
* <li>{@link Consumer#paused()}
* <li>{@link Consumer#pause(java.util.Collection)}
* <li>{@link Consumer#resume(java.util.Collection)}
* <li>{@link Consumer#offsetsForTimes(java.util.Map)}
* <li>{@link Consumer#beginningOffsets(java.util.Collection)}
* <li>{@link Consumer#endOffsets(java.util.Collection)}
* </ul>
*
* @param function A function that takes Kafka {@link Consumer} as parameter
* @return Mono that completes with the value returned by <code>function</code>
*/
<T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function);