我使用 Spring Boot 应用程序和 Kotlin lang。我想通过反应式 Kafka 消费者接收消息,然后将这些消息传递给挂起的 Kotlin 函数并在那里使用协程,然后获取该挂起函数的结果并将其传递给反应式 Kafka 生产者。
我不能只在反应流函数中调用挂起的函数,我需要某种桥梁......
我从 kotlinx-coroutines-reactive 中找到了“mono { }”函数,它可以工作,但我应该在这个函数上调用 subscribe() ,但这不好,因为不建议在非阻塞上下文中调用“subscribe()”。
如果我的挂起函数发生错误,消费者会完全崩溃,我无法处理错误。但我想重试。
有什么建议或示例如何正确执行并进行错误处理?
我的反应式 Kafka 消费者:
reactiveKafkaConsumer
.receive()
.doOnNext {
// mono { ???
processMessage(it.value()).also {
sendMessage(it)
}
// }.subscribe() ???
}
.doOnError {
kLogger.log{ "log error" }
}
.retryWhen(Retry.max(3).transientErrors(true))
.onErrorResume {
kLogger.log{ "log error" }
Mono.empty()
}
.repeat()
.subscribe()
我暂停的 Kotlin 功能:
suspended fun processMessage(msg: InputMessage): OutputMessage =
withContext(CoroutineScope(Dispatchers.Default).coroutineContext) {
msg.bigCollection.map {
async {
someOps(it)
}
}.awaitAll().let {
OutputMessage(it)
}
}
我的反应式卡夫卡生产者:
fun sendMessage(msg: OutputMessage) =
reactiveKafkaProducer
.send(topicName, msg)
.doOnSuccess {
kLogger.log{ "Sent successfully" }
}
.subscribe()
我们必须找到一个可以传递“mono {}”而不调用“.subscribe()”的地方,这个地方就是“.flapMap {}”:
reactiveKafkaConsumer
.receive()
// .doOnNext { change it to flatMap
.flatMap {
mono {
processMessage(it.value()).also {
sendMessage(it)
}
}
}
.doOnError {
kLogger.log{ "log error" }
}
.retryWhen(Retry.max(3).transientErrors(true))
.onErrorResume {
kLogger.log{ "log error" }
Mono.empty()
}
.repeat()
.subscribe()