如何处理速率受限的API调用[已关闭]

问题描述 投票:0回答:1

我们正在使用多个使用 API 的外部提供商,每个提供商都有自己的策略、速率限制和规则。 我希望能够在多个环境中调用这些 api,而不会达到速率限制(因为其中一些 API 会因为我调用它们太多而“惩罚”我)。

是否有任何包知道如何处理它,或者我需要实现自己的解决方案?

我尝试的解决方案是使用速率限制功能的包,但由于它无法与外部数据库异步工作,因此该解决方案不起作用。

python api rate-limiting
1个回答
0
投票

您必须购买速率限制器 Sass,或者推出自己的速率限制器。如果您完全控制调用这些 API 的所有代码,则可以使用一个 Python 库,该库可以通过 redis 进行协调,在 JSON 或 TOML 文件中对限制进行声明性配置,并提供装饰器或上下文管理器(要在

with
语句)可以在 2-3 天内构建。这里的答案只能给出一般方向。

正如您提到的“多个环境”,这种在集中位置跟踪费率的解决方案是唯一要做的事情。我重申,

redis
将是跟踪所有正在进行的调用的合适工具 - 它既具有足够的性能,又提供所需的原子、分布式可访问的数据结构,使这种跟踪不再那么麻烦。

对于单个进程,在单个线程中,使用 asyncio,一个 asyncio.Semaphore` 对象可能就足够了,或者可能只是一个小的专门化,这会延迟实现所使用的资源。

这个可以做到(也许窗口计算可能更优化 - 但这对于单个工人来说是可以的):

import asyncio
import time
import sys


class WindowedSemaphore(asyncio.Semaphore):
    def __init__(self, value=1, window=60):
        self.__initial_value = value
        self.__window = window
        self.__last_window_start = 0
        self.__peak_since_last_start = 0
        super().__init__(value=value)

    async def acquire(self):
        # A fine grained, reasonable constant to use
        # for sleeping times as things happen:
        sleep_time = sys.getswitchinterval()
        # Hold the call to
        while (
            time.monotonic() - self.__last_window_start <= self.__window
            and self.__peak_since_last_start > self.__initial_value
        ):
            await asyncio.sleep(sleep_time)

        result = super().acquire()

        if (now := time.monotonic()) - self.__last_window_start > self.__window:
            self.__last_window_start = now
            self.__peak_since_last_start = 1
        else:
            self.__peak_since_last_start += 1
        return result

© www.soinside.com 2019 - 2024. All rights reserved.