如何有效地实现跨多个协程的速率限制?

问题描述 投票:0回答:3

假设我有一堆正在运行的协程与某些 Web 服务交互,并且由于我不想发送垃圾邮件,所以我想将请求限制为每 x 秒最多 1 个请求。为此,我可以使用这样的代码:

fun CoroutineScope.rateLimiter(tokens: SendChannel<Unit>, rate: Int) = launch {
    var lastToken = System.currentTimeMillis()
    while (isActive) {
        val currentTime = System.currentTimeMillis()
        if (currentTime - lastToken < rate) {
            delay(currentTime - lastToken)
        }
        tokens.send(Unit)
    }
}

fun CoroutineScope.request(tokens: ReceiveChannel<Unit>) = launch { 
    for (token in tokens) {
        //Do Web request
    }
}

1.) 这种方法有效率吗?

2.)这不能扩展为将某些内容限制为 x 字节/秒,或者我需要从 Token Bucket 中请求 x 令牌,使用协程实现类似内容的最佳方法是什么?

kotlin kotlinx.coroutines
3个回答
1
投票

如果您想跳过对作业和渠道的依赖,这些作业和渠道可能会产生比消耗的许可证更多的许可证,然后一旦某个流程开始获取许可证就会出现蜂群,也许这就是您的解决方案。

(这里有一些jvm风格,但可以替换为多平台)


import kotlin.math.max
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

class SimpleRateLimiter(eventsPerSecond: Double) {

    private val mutex = Mutex()

    @Volatile
    private var next: Long = Long.MIN_VALUE
    private val delayNanos: Long = (1_000_000_000L / eventsPerSecond).toLong()

    /**
     * Suspend the current coroutine until it's calculated time of exit
     * from the rate limiter
     */
    suspend fun acquire() {
        val now: Long = System.nanoTime()
        val until = mutex.withLock {
            max(next, now).also {
                next = it + delayNanos
            }
        }
        if (until != now) {
            delay((until - now) / 1_000_000)
        }
    }
}

它还伴随着其他权衡。

  • nanoTime 接近 Long.MAX_VALUE 时的行为肯定已损坏。
  • 没有最大延迟/超时的暗示
  • 无法抓住多个白蚁
  • 没有 tryAquire 实现

如果你想要一个IntervalLimiter,允许每Y秒X个请求,然后抛出异常,Resilience4J中有RateLimiter 或者,如果您想要功能更齐全的东西,我正在开发一个 PR 以在协程核心项目中创建 RateLimiter 和 IntervalLimiter


0
投票

(来自 功能请求 Kotlin/kotlinx.coroutines#460,您好!)
这是我放在内部厨房水槽库中的速率限制器。简单*,多平台,没有额外的部门:

public fun <T> rateLimiter(resource: T, minDelay: Duration): Flow<T>
    = flow {
        while (true) {
            emit(resource)
            delay(minDelay)
        }
    }.conflate()

用途:

val endpoints = listOf("dokka", "kotlinx.coroutines", "kotlinx.serialization").map { "https://api.github.com/repos/Kotlin/$it/events" }
val responses = rateLimiter(HttpClient(), 500.milliseconds).associateWithEach(endpoints)
    .map { (httpClient, uri) -> httpClient.get(uri).body<String>().also { println(it.length) } }
    .toList()
println("got ${responses.size} responses")

// * and the asterisk: I don't think `rateLimiter` is useful unless paired with the not-so-simple `associateWithEach` extension, which looks like this:
public fun <C, T> Flow<C>.associateWithEach(iterable: Iterable<T>): Flow<Pair<C, T>>
    = iterable.iterator().let { iter ->
        this.transformWhile { recvElem ->
            if (iter.hasNext()) emit(Pair(recvElem, iter.next()))
            iter.hasNext()
        }
    }

0
投票

我使用了这个简单的桶算法实现。它会暂停协程,直到有新的令牌可供请求为止。

import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock

class SimpleBucketRateLimiter(
    private val numberOfRequest: Int,
    private val timePerRequestMills: Long,
) {

    private var lastRefillTime = 0L
    private var nextRefillTime = 0L
    private var tokens = numberOfRequest
    private val mutex = Mutex()

    suspend fun tryConsume() {
        mutex.withLock {
            refill()

            if (tokens > 0) {
               tokens--
            } else {
               while (tokens <= 0) {
                    println("RateLimiter: tokens not available, delay 100ms")
                    delay(100)
                    refill()
                }
            }
        }
    }

    private fun refill() {
        val now = Clock.System.now().toEpochMilliseconds()
        if (now < nextRefillTime) {
            return
        }

        lastRefillTime = Clock.System.now().toEpochMilliseconds()
        nextRefillTime = lastRefillTime + timePerRequestMills
        tokens = numberOfRequest
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.