如何使用aiohttp和速率限制?

问题描述 投票:0回答:1

我这周开始学习 Python,作为第一个项目,我选择开发一个简单的应用程序,该应用程序可以从 Riot API 获取数据,对其进行处理,然后插入到 mySQL 数据库中。我成功地使其同步工作,没有出现任何问题,这对于该项目来说已经足够了,因为该 API 的速率限制为每秒 20 次调用和每 120 秒 100 次调用,并且在 40 秒内就已经达到了速率限制。

但是,我想改进它,因为有可能获得更好的密钥,其提供的速率限制远高于我所附加的密钥。

我尝试使用多线程来解决问题,通过设置一个具有 5 个工作线程的线程执行器,其中每个工作线程都会为每个匹配项进行获取,从池中获取连接,将数据插入数据库,然后获取下一个可用的数据匹配来获取,但这种方法看起来不太有前途,因为它插入了多次小数据块。 我的代码使用单个 fetch 函数,该函数接收从其他函数生成的 URL,代码结构如下所示:

def fetch(url):
    while True:
        response = session.get(url, headers={"X-Riot-Token": f"{api_key}"})
        if response.status_code == 200:
            return response.json()
        elif response.status_code == 429:
            retry_after = int(response.headers.get("Retry-After", 1))
            time.sleep(retry_after+1)
        else:
            return None

结构差不多就是这样,在while循环里面有一个try except。 while 循环阻止请求不被完成,如果它收到速率限制请求,它只会等到超时结束。

通过多线程的工作,我还添加了全局重试,并在执行请求之前验证它是否受到速率限制,但它不够快,并且在速率限制开始后它仍然执行了一些请求,这可以导致此 API 上出现黑名单。

我尝试使用 asyncio 和 aiohttp,速度更快,但每秒 20 个请求和每 120 秒 100 个请求都已达到,并且请求继续运行。

我看到了关于信号量的内容,但我不太明白它是如何工作的,我应该创建两个信号量,每个速率限制一个,根据请求获取每个信号量,执行 asyncio.sleep 然后相应地释放它们吗?下一个请求还会发生吗?我在这里看到了其他帖子,但是没有一个帖子介绍了如何使用两种不同的速率限制。

基本结构是: 同步请求获取比赛列表 -> 异步请求每场比赛获取各自的数据 -> 异步请求获取比赛中每个球员的信息 -> 处理后将所有数据插入数据库。

python python-asyncio aiohttp rate-limiting
1个回答
0
投票

信号量对于限制同时任务的数量会更有用。对于速率限制,您可以创建一个简单的计数器,该计数器在达到 0 时阻塞并在时间限制时重置。 (我还使用 Gubernator 作为速率限制的外部服务,这对于更复杂的情况可能很有用)。

所以,作为一个快速尝试,也许是这样的:

class RateLimit:
    def __init__(self, limits, reset_times):
        self._limits = limits
        self._reset_times = reset_times
        self._counts = list(limits)

    async def limit(self):
        while True:
            if all(c > 0 for c in self._counts):
                for i in range(len(self._counts)):
                    self._counts[i] -= 1
                return
            await self._waiter.wait()

    async def __aenter__(self):
        self._task = asyncio.create_task(self._reset())
        return self

    async def __aexit__(self, ...):
        self._task.cancel()
        await self._task

    async def _reset(self):
        times = list(self._reset_times)
        while True:
            self._waiter = asyncio.Event()
            delay = min(times)
            try:
                await asyncio.sleep(delay)
            except asyncio.CancelledError:
                break
            for i in range(len(times)):
                times[i] -= delay
                if times[i] <= 0:
                    self._counts[i] = self._limits[i]
                    times[i] += self._reset_times[i]

            self._waiter.set()

async with RateLimiter((100, 20), (120, 1)) as rl:
    await rl.limit()  # Before each request

为了使其更加健壮,也许应该使用一个队列,以便始终按相同的顺序发出请求。

可能也值得查看现有的库(快速搜索找到 aiolimiter 和 asynciolimiter),尽管我不确定这些库在多个限制下的表现如何(例如,是否可以以可靠的方式组合 2 个限制器)。

© www.soinside.com 2019 - 2024. All rights reserved.