带有请求队列的Kotlin服务

问题描述 投票:0回答:2

我想用以下API设计一个服务:

suspend fun getUsers(request: Request): List<User>

在引擎盖下,我会向服务器发送一个请求(无论如何,但让我们说它是一个反应性的WebClient),但这里有一个技巧:我只能每隔500毫秒发送一次请求,否则我会收到一个错误。

有人可以推荐我如何实现它,当我从协程中调用getUsers它暂停时,工作单元被添加到具有此方法的服务的某个队列中,然后在某个时间点实现并返回结果?

我假设我可以使用一些ReceiveChannel作为队列,有一个for循环的元素与delay里面,但我有点迷失在哪里放这个逻辑。这应该像一个背景方法,将永远运行并被getUsers调用?可能永远不会调用close方法,所以这个方法也可以暂停,但是如何将值从这个无限运行方法传递回需要结果的getUsers

编辑

目前我正在考虑这样的解决方案:

private const val REQUEST_INTERVAL = 500

@Service
class DelayedRequestSenderImpl<T> : DelayedRequestSender<T> {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<T>> = Channel()

    override suspend fun requestAsync(block: () -> T): Deferred<T> {
        val deferred = GlobalScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred
    }

    @PostConstruct
    private fun startRequestProcessing() = GlobalScope.launch {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < REQUEST_INTERVAL) {
                delay(REQUEST_INTERVAL - diff)
                lastRequestTime = now
            }
            request.start()
        }
    }
}

我在这里看到的问题是,我必须对类进行生成以使requestChannel成为通用的,因为请求的结果可能是任何东西。但这意味着DelayedRequestSender的每个实例都将绑定到特定类型。关于如何避免这种情况的任何建议?

编辑2

这是一个精致的版本。我目前看到的唯一可能的流程是我们必须公开@PostConstruct方法,以便在我们想要或使用反射时编写任何测试。

我的想法是不使用GlobalScope,并且还有一个单独的Job用于处理方法。这是一个很好的方法吗?

interface DelayingSupplier {
    suspend fun <T> supply(block: () -> T): T
}

@Service
class DelayingSupplierImpl(@Value("\${vk.request.interval}") private val interval: Int) : DelayingSupplier {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<*>> = Channel()
    private val coroutineScope = CoroutineScope(EmptyCoroutineContext)

    override suspend fun <T> supply(block: () -> T): T {
        val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred.await()
    }

    @PostConstruct
    fun startProcessing() = coroutineScope.launch(context = Job(coroutineScope.coroutineContext[Job])) {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < interval) {
                delay(interval - diff)
            }
            lastRequestTime = LocalDateTime.now()
            request.start()
        }
    }
}
kotlin architecture future deferred kotlin-coroutines
2个回答
1
投票

我建议:

  • 将您的泛型推向功能级别
  • 使用actor代替你的coroutine实现(但你可能更喜欢这个)。

无论哪种方式,此解决方案都应该允许您使用队列的单个实例来处理所有请求的延迟,而不管返回类型如何。 (道歉,我重命名了一些东西来帮助我自己的概念化,希望这仍然有意义):

private const val REQUEST_INTERVAL = 500

interface DelayedRequestHandler {

    suspend fun <T> handleWithDelay(block: () -> T): T

}

class DelayedRequestHandlerImpl(requestInterval: Int = REQUEST_INTERVAL) : DelayedRequestHandler, CoroutineScope {
    private val job = Job()
    override val coroutineContext = Dispatchers.Unconfined + job
    private val delayedHandlerActor = delayedRequestHandlerActor(requestInterval)

    override suspend fun <T> handleWithDelay(block: () -> T): T {
        val result = CompletableDeferred<T>()
        delayedHandlerActor.send(DelayedHandlerMsg(result, block))
        return result.await()
    }
}

private data class DelayedHandlerMsg<RESULT>(val result: CompletableDeferred<RESULT>, val block: () -> RESULT)

private fun CoroutineScope.delayedRequestHandlerActor(requestInterval: Int) = actor<DelayedHandlerMsg<*>>() {
    var lastRequestTime: LocalDateTime = LocalDateTime.now()
    for (message in channel) {
        try {
            println("got a message processing")
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < requestInterval) {
                delay(requestInterval - diff)
            }
            lastRequestTime = LocalDateTime.now()
            @Suppress("UNCHECKED_CAST")
            val msgCast = message as DelayedHandlerMsg<Any?>
            val result = msgCast.block()
            println(result)
            msgCast.result.complete(result)
        } catch (e: Exception) {
            message.result.completeExceptionally(e)
        }
    }
}


fun main() = runBlocking {
    val mydelayHandler = DelayedRequestHandlerImpl(2000)
    val jobs = List(10) {
        launch {
            mydelayHandler.handleWithDelay {
                "Result $it"
            }
        }
    }
    jobs.forEach { it.join() }
}

0
投票

所以这是我提出的最终实施。请注意SupevisorJob,因为如果其中一个请求失败,我们不希望处理停止,这完全可能并且很好(至少在我的情况下)。

此外,@ Laurence建议的选项可能会更好,但由于API被标记为过时,我决定暂时不使用actor。

@Service
class DelayingRequestSenderImpl(@Value("\${vk.request.interval}") private val interval: Int) : DelayingRequestSender {
    private var lastRequestTime: LocalDateTime = LocalDateTime.now()
    private val requestChannel: Channel<Deferred<*>> = Channel()
    //SupervisorJob is used because we want to have continuous processing of requestChannel
    //even if one of the requests fails
    private val coroutineScope = CoroutineScope(SupervisorJob())

    override suspend fun <T> request(block: () -> T): T {
        val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { block() }
        requestChannel.send(deferred)
        return deferred.await()
    }

    @PostConstruct
    fun startProcessing() = coroutineScope.launch {
        for (request in requestChannel) {
            val now = LocalDateTime.now()
            val diff = ChronoUnit.MILLIS.between(lastRequestTime, now)
            if (diff < interval) {
                delay(interval - diff)
            }
            lastRequestTime = LocalDateTime.now()
            request.start()
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.