场景:我需要从 Web 应用程序的 API 收集分页数据,该 API 的调用限制为每分钟 100 次。我需要返回的 API 对象每页包含 100 个项目,总共 105 个页面(并且还在不断增加)(总共约 10,500 个项目)。同步代码大约需要 15 分钟才能检索所有页面,因此不必担心会达到调用限制。但是,我想加快数据检索速度,因此我使用
asyncio
和 aiohttp
实现了异步调用。现在数据可在 15 秒内下载 - 很好。
问题:我现在已达到呼叫限制,因此在最近 5 次左右的呼叫中收到 403 错误。
建议的解决方案 我实现了在
try/except
函数中找到的 get_data()
。我拨打电话,然后当由于 403: Exceeded call limit
导致呼叫不成功时,我会后退 back_off
秒并重试最多 retries
次:
async def get_data(session, url):
retries = 3
back_off = 60 # seconds to try again
for _ in range(retries):
try:
async with session.get(url, headers=headers) as response:
if response.status != 200:
response.raise_for_status()
print(retries, response.status, url)
return await response.json()
except aiohttp.client_exceptions.ClientResponseError as e:
retries -= 1
await asyncio.sleep(back_off)
continue
async def main():
async with aiohttp.ClientSession() as session:
attendee_urls = get_urls('attendee') # returns list of URLs to call asynchronously in get_data()
attendee_data = await asyncio.gather(*[get_data(session, attendee_url) for attendee_url in attendee_urls])
return attendee_data
if __name__ == '__main__':
data = asyncio.run(main())
问题:如何限制 aiohttp 调用,使其保持在 100 次调用/分钟阈值以下,而不发出 403 请求退出?我已经尝试了以下模块,但它们似乎都没有做任何事情:
ratelimiter
、ratelimit
和asyncio-throttle
。
目标:每分钟进行 100 次异步调用,但如有必要,请退出并重试(403:超出调用限制)。
通过在每个请求之前添加延迟,您可以实现“最多 100 个请求/分钟”。
100 个请求/分钟相当于 1 个请求/0.6 秒。
async def main():
async with aiohttp.ClientSession() as session:
attendee_urls = get_urls('attendee') # returns list of URLs to call asynchronously in get_data()
coroutines = []
for attendee_url in attendee_urls:
coroutines.append(asyncio.Task(get_data(session, attendee_url)))
await asyncio.sleep(0.6)
attendee_data = asyncio.gather(*coroutines)
return attendee_data
除了请求速率限制之外,API 通常还会限制请求数量。同时请求的数量。如果是这样,您可以使用BoundedSempahore。
async def main():
sema = asyncio.BoundedSemaphore(50) # Assuming a concurrent requests limit of 50
...
coroutines.append(get_data(sema, session, attendee_url))
...
def get_data(sema, session, attendee_url):
...
for _ in range(retries):
try:
async with sema:
response = await session.get(url, headers=headers):
if response.status != 200:
response.raise_for_status()
...