如何正确调用限速API?

问题描述 投票:0回答:1

我有一个应用程序需要从 API 端点请求一些数据。从该 API 端点返回的数据是分页的。假设我每次通话都会检索 1000 条记录。对于每条记录,我又发出 4 个请求来获取相关数据。另外,假设总共有 11,0000 条记录,即 11 页。

API 允许每分钟最多 60 个请求。对于检索到的每个页面,我都会对其进行映射以获取某些属性的相关记录。

我的尝试是创建一个函数,通过使用 kotlin 的延迟来观察速率限制。功能如下:

suspend fun <T> observeRateLimit(delayInMillis: Long, block: suspend () -> T): Deferred<T> = withContext(Dispatchers.IO) {
    launch { delay(delayInMillis) }
    async { block() }
}

然后在调用端点的函数内部使用该函数。

suspend fun populate(nextPage: Int?) {
   getAllRecords(nextPage).map { result ->
        val records = result.data.map {
             // Wait for 1 minute
             observeRateLimitAsync(60000) {
                 val id = it.id
                 val entity = it.mapToEntity()

                 // Fetch related record (1)
                 val related1 = fetchRelatedRecord1(id)

                 // Fetch related record (2)
                 val related2 = fetchRelatedRecord2(id)

                 // Fetch related record (3)
                 val related3 = fetchRelatedRecord3(id)

                 // Fetch related record (4)
                 val related4 = fetchRelatedRecord4(id)

                 // Process the entity
                 entity
            }.await()
        }

        // Insert records to DB

        // If next page exists
        if (result.pages.nextUrl != null) populate(records.size + 1000)
   }.flowOn(Dispatcher.IO).collect()
}

这可行,但正如您所注意到的,运行速度非常慢。请记住,大约有 11,000 条记录。

如何在不达到服务器速率限制的情况下高效地从 API 检索所有记录?

我也对探索其他想法感兴趣,这些想法将允许在本地数据库中缓存来自此端点的所有记录。

android kotlin rate-limiting
1个回答
0
投票

据我了解,您需要执行以下 API 调用才能获取所有数据:

  1. 11页(每页1000条记录)
  2. 对于每条记录 4 个额外调用:11 * 1000 * 4 = 44,000

总共 44,011 次 API 调用。如果速率限制为每分钟 60 次呼叫,则 44,011/60 分钟或仅略多于 12 小时。这与您提出请求的效率无关,这是端点允许的最佳情况。

我们显然无法帮助您使端点更快。不过,我们能做的就是帮助您接近 12 小时的最佳情况。在现实生活中的例子中,可能更像 13。

一个明显的问题是,如果将所有这些都放在一边是否有意义,最好的方法是将速率限制方面与调用者解耦。它应该是 API 的一个组成部分,以便您的

populate
函数和
getAllRecords
lambda 不需要计算等待下一个请求的时间。他们的责任应该只是循环所有页面并检索附加的相关数据。

需要更改的内容取决于

getAllRecords
fetchRelatedRecord
函数实际如何从 API 请求数据。由于您没有向我们展示任何有关 API 界面的内容,因此以下内容仅展示一般方法。假设您的 API 如下所示:

class API {
    suspend fun unlimitedRequest(request: Request): Result {
        // immediately send request to endpoint
    }
}

这将在调用后立即从端点请求数据。现在,我们不再直接从

getAllRecords
fetchRelatedRecord
函数调用此方法,而是实现一个新的
rateLimitedRequest
方法,该方法尊重端点的速率限制行为,并尽快调用
unlimitedRequest
,同时也等待只要有需要:

class API( maxRequestsCount: Int = 60, private val maxRequestsDuration: Duration = 1.minutes, private val waitingScope: CoroutineScope, ) { private val channel = Channel<Unit>(maxRequestsCount) suspend fun rateLimitedRequest(request: Request): Result { // Wait for an empty slot in the requests queue by trying to place a token in the channel. channel.send(Unit) // Do the request. val result = unlimitedRequest(request) // Remove a token from the channel after maxRequestsDuration. waitingScope.launch { delay(maxRequestsDuration) channel.tryReceive() } return result } private suspend fun unlimitedRequest(request: Request): Result { // immediately send request to endpoint } }
您的 API 现在需要使用 

maxRequestsCount

 进行初始化,作为每个 
maxRequestsDuration
 允许的请求数量。对于您的情况,这将是 
60
1.minutes
waitingScope
 是协程范围,将用于等待直到可以发送下一个请求。

这是如何运作的

使用的

Channel

 就像一个容量为 
maxRequestsCount
 的阻塞队列(对你来说是 60)。当队列未满时,意味着可以安全地发出请求。通过 
channel.send(Unit)
,我们尝试在队列中占据一个位置。如果队列已满(即,我们达到了速率限制),该函数将等待,直到有一个槽空闲。

当我们占据队列中的一个槽位时,我们现在可以自由地向端点发送请求。当我们完成后,我们需要释放队列中的槽位。现在,当我们调用

channel.tryReceive()

 (从通道中删除一个元素)时,等待进入队列的下一个请求将立即再次调用端点。我们想要的是将其延迟 
maxRequestsDuration
(这是在单独的协程中完成的,以便可以立即重新调整结果)。

如果队列的容量 (

maxRequestsCount

) 为 
1
,这只会将下一个请求延迟一分钟。速率限制行为为 1/分钟。

如果容量设置为

2

,则将立即发出下一个请求(因为队列中还有另一个空闲插槽),但是第一个和第二个请求只会在一分钟后释放其插槽,因此现在队列已满。第三个请求必须等待前两个请求之一释放其插槽。这实际上会导致 2/分钟的速率限制行为。

因此,

60

 的容量允许每分钟 60 个请求。


如果您确实想继续执行此操作,尽管这总共需要 13 个小时,您应该

一定缓存您的结果。这可以通过房间数据库来完成。

© www.soinside.com 2019 - 2024. All rights reserved.