在反应流中调用 Kotlin 挂起函数

问题描述 投票:0回答:1

我使用 Spring Boot 应用程序和 Kotlin lang。我想通过反应式 Kafka 消费者接收消息,然后将这些消息传递给挂起的 Kotlin 函数并在那里使用协程,然后获取该挂起函数的结果并将其传递给反应式 Kafka 生产者。

我不能只在反应流函数中调用挂起的函数,我需要某种桥梁......

我从 kotlinx-coroutines-reactive 中找到了“mono { }”函数,它可以工作,但我应该在这个函数上调用 subscribe() ,但这不好,因为不建议在非阻塞上下文中调用“subscribe()”。

如果我的挂起函数发生错误,消费者会完全崩溃,我无法处理错误。但我想重试。

有什么建议或示例如何正确执行并进行错误处理?

我的反应式 Kafka 消费者:

reactiveKafkaConsumer
    .receive()
    .doOnNext {
        // mono { ???

        processMessage(it.value()).also {
            sendMessage(it)
        }

        // }.subscribe() ???
    }
    .doOnError {
        kLogger.log{ "log error" }
    }
    .retryWhen(Retry.max(3).transientErrors(true))
    .onErrorResume {
        kLogger.log{ "log error" }
        Mono.empty()
    }
    .repeat()
    .subscribe()

我暂停的 Kotlin 功能:

suspended fun processMessage(msg: InputMessage): OutputMessage =
    withContext(CoroutineScope(Dispatchers.Default).coroutineContext) {
        msg.bigCollection.map {
            async {
                someOps(it)
            }
        }.awaitAll().let {
            OutputMessage(it)
        }
    }

我的反应式卡夫卡生产者:

fun sendMessage(msg: OutputMessage) =
    reactiveKafkaProducer
        .send(topicName, msg)
        .doOnSuccess {
            kLogger.log{ "Sent successfully" }
        }
        .subscribe()
kotlin apache-kafka spring-webflux kotlin-coroutines spring-kafka
1个回答
0
投票

我们必须找到一个可以传递“mono {}”而不调用“.subscribe()”的地方,这个地方就是“.flapMap {}”:

reactiveKafkaConsumer
    .receive()
    // .doOnNext { change it to flatMap
    .flatMap {
        mono {
            processMessage(it.value()).also {
                sendMessage(it)
            }
        }
    }
    .doOnError {
        kLogger.log{ "log error" }
    }
    .retryWhen(Retry.max(3).transientErrors(true))
    .onErrorResume {
        kLogger.log{ "log error" }
        Mono.empty()
    }
    .repeat()
    .subscribe()
© www.soinside.com 2019 - 2024. All rights reserved.