最初,Kafka 接收器按预期工作,但是一旦 Kafka 代理重新启动并返回,同一个接收器就无法接收消息,日志中除了 warn 之外没有看到任何错误
ReceiverOptions<String, byte[]> receiverConfig = getReceiverConfig(topic);
pipelineStatusKafkaReceiver = KafkaReceiver.create(
receiverConfig.subscription(Collections.singleton(topic)));
pipelineStatusKafkaReceiver.receive()
.retryWhen(Retry.backoff(300, Duration.of(10L, ChronoUnit.SECONDS)))
.doOnError(throwable -> LOGGER.error("Error from retry"))
.subscribe(
receiverRecord -> Mono.just("Kafka: Listening Pipeline status on {}")
.doOnEach(logOnNext(s -> LOGGER.info(s, topic)))
.flatMap(s -> updatePipelineStatus(receiverRecord))
.doOnSuccess(receiveRecord -> `receiverRecord.receiverOffset().acknowledge())`
.subscribe());
此实现将在发生错误时递归调用
subscribeAndRetryOnError
,在断开连接时有效地重新启动订阅。 Disposable
将帮助管理订阅生命周期以防止资源泄漏。
private Disposable disposable;
public void subscribeAndUpdatePipelineStatues(String topic) {
ReceiverOptions<String, byte[]> receiverConfig = getReceiverConfig(topic);
pipelineStatusKafkaReceiver = KafkaReceiver.create(receiverConfig.subscription(Collections.singleton(topic)));
disposable = subscribeAndRetryOnError(topic);
}
private Disposable subscribeAndRetryOnError(String topic) {
return pipelineStatusKafkaReceiver.receive()
.concatMap(receiverRecord ->
Mono.just("Kafka: Listening Pipeline status on {}")
.doOnNext(s -> LOGGER.info(s, topic))
.then(updatePipelineStatus(receiverRecord))
.doOnSuccess(v -> receiverRecord.receiverOffset().acknowledge())
.onErrorResume(throwable -> {
LOGGER.error("Error processing record", throwable);
return Mono.empty();
})
)
.doOnError(throwable -> {
LOGGER.error("Receiver terminated with error, restarting...", throwable);
disposable.dispose();
disposable = subscribeAndRetryOnError(topic);
})
.subscribe();
}