假设我有一堆正在运行的协程与某些 Web 服务交互,并且由于我不想发送垃圾邮件,所以我想将请求限制为每 x 秒最多 1 个请求。为此,我可以使用这样的代码:
fun CoroutineScope.rateLimiter(tokens: SendChannel<Unit>, rate: Int) = launch {
var lastToken = System.currentTimeMillis()
while (isActive) {
val currentTime = System.currentTimeMillis()
if (currentTime - lastToken < rate) {
delay(currentTime - lastToken)
}
tokens.send(Unit)
}
}
fun CoroutineScope.request(tokens: ReceiveChannel<Unit>) = launch {
for (token in tokens) {
//Do Web request
}
}
1.) 这种方法有效率吗?
2.)这不能扩展为将某些内容限制为 x 字节/秒,或者我需要从 Token Bucket 中请求 x 令牌,使用协程实现类似内容的最佳方法是什么?
如果您想跳过对作业和渠道的依赖,这些作业和渠道可能会产生比消耗的许可证更多的许可证,然后一旦某个流程开始获取许可证就会出现蜂群,也许这就是您的解决方案。
(这里有一些jvm风格,但可以替换为多平台)
import kotlin.math.max
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
class SimpleRateLimiter(eventsPerSecond: Double) {
private val mutex = Mutex()
@Volatile
private var next: Long = Long.MIN_VALUE
private val delayNanos: Long = (1_000_000_000L / eventsPerSecond).toLong()
/**
* Suspend the current coroutine until it's calculated time of exit
* from the rate limiter
*/
suspend fun acquire() {
val now: Long = System.nanoTime()
val until = mutex.withLock {
max(next, now).also {
next = it + delayNanos
}
}
if (until != now) {
delay((until - now) / 1_000_000)
}
}
}
它还伴随着其他权衡。
如果你想要一个IntervalLimiter,允许每Y秒X个请求,然后抛出异常,Resilience4J中有RateLimiter 或者,如果您想要功能更齐全的东西,我正在开发一个 PR 以在协程核心项目中创建 RateLimiter 和 IntervalLimiter 。
(来自 功能请求 Kotlin/kotlinx.coroutines#460,您好!)
这是我放在内部厨房水槽库中的速率限制器。简单*,多平台,没有额外的部门:
public fun <T> rateLimiter(resource: T, minDelay: Duration): Flow<T>
= flow {
while (true) {
emit(resource)
delay(minDelay)
}
}.conflate()
用途:
val endpoints = listOf("dokka", "kotlinx.coroutines", "kotlinx.serialization").map { "https://api.github.com/repos/Kotlin/$it/events" }
val responses = rateLimiter(HttpClient(), 500.milliseconds).associateWithEach(endpoints)
.map { (httpClient, uri) -> httpClient.get(uri).body<String>().also { println(it.length) } }
.toList()
println("got ${responses.size} responses")
// * and the asterisk: I don't think `rateLimiter` is useful unless paired with the not-so-simple `associateWithEach` extension, which looks like this:
public fun <C, T> Flow<C>.associateWithEach(iterable: Iterable<T>): Flow<Pair<C, T>>
= iterable.iterator().let { iter ->
this.transformWhile { recvElem ->
if (iter.hasNext()) emit(Pair(recvElem, iter.next()))
iter.hasNext()
}
}
我使用了这个简单的桶算法实现。它会暂停协程,直到有新的令牌可供请求为止。
import kotlinx.coroutines.delay
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock
class SimpleBucketRateLimiter(
private val numberOfRequest: Int,
private val timePerRequestMills: Long,
) {
private var lastRefillTime = 0L
private var nextRefillTime = 0L
private var tokens = numberOfRequest
private val mutex = Mutex()
suspend fun tryConsume() {
mutex.withLock {
refill()
if (tokens > 0) {
tokens--
} else {
while (tokens <= 0) {
println("RateLimiter: tokens not available, delay 100ms")
delay(100)
refill()
}
}
}
}
private fun refill() {
val now = Clock.System.now().toEpochMilliseconds()
if (now < nextRefillTime) {
return
}
lastRefillTime = Clock.System.now().toEpochMilliseconds()
nextRefillTime = lastRefillTime + timePerRequestMills
tokens = numberOfRequest
}
}