我已经看过here。但是仍然无法摆脱困境。这是我目前正在完成的操作:
urls_without_rate_limit =
[
'http://httpbin.org/get'
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get',
'http://httpbin.org/get'
]
urls_with_rate_limit =
[
'http://eu.httpbin.org/get'
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get',
'http://eu.httpbin.org/get'
]
api_rate = 2
api_limit = 6
loop = asyncio.get_event_loop()
loop.run_until_complete(
process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))
loop.run_until_complete(
process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
limit = asyncio.Semaphore(limit)
f = Fetch(
rate=rate,
limit=limit
)
tasks = []
for url in urls:
tasks.append(f.make_request(url=url))
results = await asyncio.gather(*tasks)
如您所见,它将完成process
的第一轮,然后开始第二轮速率限制。
效果很好,但是有没有办法我可以以不同的速率限制同时开始两回合?
tvm
我将详细说明我的评论。因此,您可以尝试使用自己的解决方案(即使我在此处提供完整的代码)。
您可以拥有定义一些规则的字典(API->每秒速率限制:]
APIS_RATE_LIMIT_PER_S = {
"http://api.mathjs.org/v4?precision=5": 1,
"http://api.mathjs.org/v4?precision=2": 3,
}
然后您可以根据请求URL来决定选择哪个信号灯(实际上,您必须进行一些解析才能获得要控制的端点)。一旦有了,只需使用信号量来确保限制执行请求的并发进程数。难题的最后一部分显然是在释放信号量之前增加了延迟。
我将获得建议的here的不同版本,但基本上是相同的解决方案。我只是这样做了,所以您可以修改会话对象,以便每次对session.get
的调用都将自动应用速率限制控制。
def set_rate_limits(session, apis_rate_limits_per_s):
semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}
@asynccontextmanager
async def limit_rate(url):
await semaphores[url].acquire()
start = time.time()
try:
yield semaphores[url]
finally:
duration = time.time() - start
await asyncio.sleep(1 - duration)
semaphores[url].release()
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
return await coroutine(url, *args, **kwargs)
return coroutine_with_rate_limit
session.get = add_limit_rate(session.get)
session.post = add_limit_rate(session.post)
return session
[注意,使用add_limit_rate
您可以将速率限制控件添加到任何以API端点为第一个参数的协程中。但是这里我们只修改session.get
和session.post
。
最后,您可以像这样使用set_rate_limits
函数:
async def main():
apis = APIS_RATE_LIMIT_PER_S.keys()
params = [
{"expr" : "2^2"},
{"expr" : "1/0.999"},
{"expr" : "1/1.001"},
{"expr" : "1*1.001"},
]
async with aiohttp.ClientSession() as session:
session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
api_requests = [get_text_result(session, url, params=p) for url, p in product(apis, params)]
text_responses = await asyncio.gather(*api_requests)
print(text_responses)
async def get_text_result(session, url, params=None):
result = await session.get(url, params=params)
return await result.text()
如果运行此代码,您将不会看到很多事情,可以在print
的此处和此处添加一些set_rate_limits
以“确保”正确执行速率限制:
import time
# [...] change this part :
def add_limit_rate(coroutine):
async def coroutine_with_rate_limit(url, *args, **kwargs):
async with limit_rate(url):
######### debug
global request_count
request_count += 1
this_req_id = request_count
rate_lim = APIS_RATE_LIMIT_PER_S[url]
print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
########
r = await coroutine(url, *args, **kwargs)
######### debug
print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
#########
return r
如果运行此示例asyncio.run(main())
,则应该得到类似以下内容:
request #1 -> 1ms rate 1/s
request #2 -> 2ms rate 3/s
request #3 -> 3ms rate 3/s
request #4 -> 3ms rate 3/s
request #1 <- 1003ms rate 1/s
request #2 <- 1004ms rate 3/s
request #3 <- 1004ms rate 3/s
request #5 -> 1004ms rate 1/s
request #6 -> 1005ms rate 3/s
request #4 <- 1006ms rate 3/s
request #5 <- 2007ms rate 1/s
request #6 <- 2007ms rate 3/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
似乎这里尊重速率限制,特别是我们可以看看每秒限制1个请求的速率的API:
request #1 -> 1ms rate 1/s
request #1 <- 1003ms rate 1/s
request #5 -> 1004ms rate 1/s
request #5 <- 2007ms rate 1/s
request #7 -> 2007ms rate 1/s
request #7 <- 3008ms rate 1/s
request #8 -> 3008ms rate 1/s
request #8 <- 4010ms rate 1/s
另一方面,此解决方案并不令人满意,因为我们在所有请求中人为地添加了1 ping。这是因为这部分代码:
await asyncio.sleep(1 - duration)
semaphores[url].release()
这里的问题是,在等待将控制权交还给事件循环(安排另一个任务,另一个请求)之前,我们正在等待睡眠完成。可以使用以下代码轻松解决:
asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))
[release_after_delay
仅是:
async def release_after_delay(semaphore, delay):
await asyncio.sleep(delay)
semaphore.release()
asyncio.create_task
函数使协程“在后台运行”。这意味着在此代码中,该信号量将在以后释放,但是我们不需要等待其释放就可以将控制权交还给even循环(这意味着可以调度其他请求)。换句话说,我们不在乎此协程的结果,我们只是希望它在将来的某个时候运行(这可能就是为什么此函数以前称为asyncio.create_task
的原因)。
使用此补丁,我们为API设置了以下内容,并将速率限制设置为每秒一个请求:
ensure_future
它绝对接近我们期望此代码执行的操作。我们会尽快从API获得每个响应(在此示例中,ping为200ms / 37ms / 46ms / 41ms)。并且也要遵守速率限制。
这可能不是最漂亮的代码,但是它可以作为您使用的开始。也许您可以使用它制作一个干净的程序包,但是我认为这可能是其他人可能会喜欢的东西。