我这周开始学习 Python,作为第一个项目,我选择开发一个简单的应用程序,该应用程序可以从 Riot API 获取数据,对其进行处理,然后插入到 mySQL 数据库中。我成功地使其同步工作,没有出现任何问题,这对于该项目来说已经足够了,因为该 API 的速率限制为每秒 20 次调用和每 120 秒 100 次调用,并且在 40 秒内就已经达到了速率限制。
但是,我想改进它,因为有可能获得更好的密钥,其提供的速率限制远高于我所附加的密钥。
我尝试使用多线程来解决问题,通过设置一个具有 5 个工作线程的线程执行器,其中每个工作线程都会为每个匹配项进行获取,从池中获取连接,将数据插入数据库,然后获取下一个可用的数据匹配来获取,但这种方法看起来不太有前途,因为它插入了多次小数据块。 我的代码使用单个 fetch 函数,该函数接收从其他函数生成的 URL,代码结构如下所示:
def fetch(url):
while True:
response = session.get(url, headers={"X-Riot-Token": f"{api_key}"})
if response.status_code == 200:
return response.json()
elif response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 1))
time.sleep(retry_after+1)
else:
return None
结构差不多就是这样,在while循环里面有一个try except。 while 循环阻止请求不被完成,如果它收到速率限制请求,它只会等到超时结束。
通过多线程的工作,我还添加了全局重试,并在执行请求之前验证它是否受到速率限制,但它不够快,并且在速率限制开始后它仍然执行了一些请求,这可以导致此 API 上出现黑名单。
我尝试使用 asyncio 和 aiohttp,速度更快,但每秒 20 个请求和每 120 秒 100 个请求都已达到,并且请求继续运行。
我看到了关于信号量的内容,但我不太明白它是如何工作的,我应该创建两个信号量,每个速率限制一个,根据请求获取每个信号量,执行 asyncio.sleep 然后相应地释放它们吗?下一个请求还会发生吗?我在这里看到了其他帖子,但是没有一个帖子介绍了如何使用两种不同的速率限制。
基本结构是: 同步请求获取比赛列表 -> 异步请求每场比赛获取各自的数据 -> 异步请求获取比赛中每个球员的信息 -> 处理后将所有数据插入数据库。
信号量对于限制同时任务的数量会更有用。对于速率限制,您可以创建一个简单的计数器,该计数器在达到 0 时阻塞并在时间限制时重置。 (我还使用 Gubernator 作为速率限制的外部服务,这对于更复杂的情况可能很有用)。
所以,作为一个快速尝试,也许是这样的:
class RateLimit:
def __init__(self, limits, reset_times):
self._limits = limits
self._reset_times = reset_times
self._counts = list(limits)
async def limit(self):
while True:
if all(c > 0 for c in self._counts):
for i in range(len(self._counts)):
self._counts[i] -= 1
return
await self._waiter.wait()
async def __aenter__(self):
self._task = asyncio.create_task(self._reset())
return self
async def __aexit__(self, ...):
self._task.cancel()
await self._task
async def _reset(self):
times = list(self._reset_times)
while True:
self._waiter = asyncio.Event()
delay = min(times)
try:
await asyncio.sleep(delay)
except asyncio.CancelledError:
break
for i in range(len(times)):
times[i] -= delay
if times[i] <= 0:
self._counts[i] = self._limits[i]
times[i] += self._reset_times[i]
self._waiter.set()
async with RateLimiter((100, 20), (120, 1)) as rl:
await rl.limit() # Before each request
为了使其更加健壮,也许应该使用一个队列,以便始终按相同的顺序发出请求。
可能也值得查看现有的库(快速搜索找到 aiolimiter 和 asynciolimiter),尽管我不确定这些库在多个限制下的表现如何(例如,是否可以以可靠的方式组合 2 个限制器)。