我有一个应用程序需要从 API 端点请求一些数据。从该 API 端点返回的数据是分页的。假设我每次通话都会检索 1000 条记录。对于每条记录,我又发出 4 个请求来获取相关数据。另外,假设总共有 11,0000 条记录,即 11 页。
API 允许每分钟最多 60 个请求。对于检索到的每个页面,我都会对其进行映射以获取某些属性的相关记录。
我的尝试是创建一个函数,通过使用 kotlin 的延迟来观察速率限制。功能如下:
suspend fun <T> observeRateLimit(delayInMillis: Long, block: suspend () -> T): Deferred<T> = withContext(Dispatchers.IO) {
launch { delay(delayInMillis) }
async { block() }
}
然后在调用端点的函数内部使用该函数。
suspend fun populate(nextPage: Int?) {
getAllRecords(nextPage).map { result ->
val records = result.data.map {
// Wait for 1 minute
observeRateLimitAsync(60000) {
val id = it.id
val entity = it.mapToEntity()
// Fetch related record (1)
val related1 = fetchRelatedRecord1(id)
// Fetch related record (2)
val related2 = fetchRelatedRecord2(id)
// Fetch related record (3)
val related3 = fetchRelatedRecord3(id)
// Fetch related record (4)
val related4 = fetchRelatedRecord4(id)
// Process the entity
entity
}.await()
}
// Insert records to DB
// If next page exists
if (result.pages.nextUrl != null) populate(records.size + 1000)
}.flowOn(Dispatcher.IO).collect()
}
这可行,但正如您所注意到的,运行速度非常慢。请记住,大约有 11,000 条记录。
如何在不达到服务器速率限制的情况下高效地从 API 检索所有记录?
我也对探索其他想法感兴趣,这些想法将允许在本地数据库中缓存来自此端点的所有记录。
据我了解,您需要执行以下 API 调用才能获取所有数据:
总共 44,011 次 API 调用。如果速率限制为每分钟 60 次呼叫,则 44,011/60 分钟或仅略多于 12 小时。这与您提出请求的效率无关,这是端点允许的最佳情况。
我们显然无法帮助您使端点更快。不过,我们能做的就是帮助您接近 12 小时的最佳情况。在现实生活中的例子中,可能更像 13。
一个明显的问题是,如果将所有这些都放在一边是否有意义,最好的方法是将速率限制方面与调用者解耦。它应该是 API 的一个组成部分,以便您的
populate
函数和 getAllRecords
lambda 不需要计算等待下一个请求的时间。他们的责任应该只是循环所有页面并检索附加的相关数据。
需要更改的内容取决于
getAllRecords
和 fetchRelatedRecord
函数实际如何从 API 请求数据。由于您没有向我们展示任何有关 API 界面的内容,因此以下内容仅展示一般方法。假设您的 API 如下所示:
class API {
suspend fun unlimitedRequest(request: Request): Result {
// immediately send request to endpoint
}
}
这将在调用后立即从端点请求数据。现在,我们不再直接从
getAllRecords
和 fetchRelatedRecord
函数调用此方法,而是实现一个新的 rateLimitedRequest
方法,该方法尊重端点的速率限制行为,并尽快调用 unlimitedRequest
,同时也等待只要有需要:
class API(
maxRequestsCount: Int = 60,
private val maxRequestsDuration: Duration = 1.minutes,
private val waitingScope: CoroutineScope,
) {
private val channel = Channel<Unit>(maxRequestsCount)
suspend fun rateLimitedRequest(request: Request): Result {
// Wait for an empty slot in the requests queue by trying to place a token in the channel.
channel.send(Unit)
// Do the request.
val result = unlimitedRequest(request)
// Remove a token from the channel after maxRequestsDuration.
waitingScope.launch {
delay(maxRequestsDuration)
channel.tryReceive()
}
return result
}
private suspend fun unlimitedRequest(request: Request): Result {
// immediately send request to endpoint
}
}
您的 API 现在需要使用 maxRequestsCount
进行初始化,作为每个
maxRequestsDuration
允许的请求数量。对于您的情况,这将是
60
每
1.minutes
。
waitingScope
是协程范围,将用于等待直到可以发送下一个请求。这是如何运作的
Channel
就像一个容量为
maxRequestsCount
的阻塞队列(对你来说是 60)。当队列未满时,意味着可以安全地发出请求。通过
channel.send(Unit)
,我们尝试在队列中占据一个位置。如果队列已满(即,我们达到了速率限制),该函数将等待,直到有一个槽空闲。当我们占据队列中的一个槽位时,我们现在可以自由地向端点发送请求。当我们完成后,我们需要释放队列中的槽位。现在,当我们调用
channel.tryReceive()
(从通道中删除一个元素)时,等待进入队列的下一个请求将立即再次调用端点。我们想要的是将其延迟
maxRequestsDuration
(这是在单独的协程中完成的,以便可以立即重新调整结果)。如果队列的容量 (
maxRequestsCount
) 为
1
,这只会将下一个请求延迟一分钟。速率限制行为为 1/分钟。如果容量设置为
2
,则将立即发出下一个请求(因为队列中还有另一个空闲插槽),但是第一个和第二个请求只会在一分钟后释放其插槽,因此现在队列已满。第三个请求必须等待前两个请求之一释放其插槽。这实际上会导致 2/分钟的速率限制行为。因此,
60
的容量允许每分钟 60 个请求。
一定缓存您的结果。这可以通过房间数据库来完成。