[aiohttp:按域类型每秒的速率限制请求数

问题描述 投票:1回答:1

我已经看过here。但是仍然无法摆脱困境。这是我目前正在完成的操作:

urls_without_rate_limit = 
    [
       'http://httpbin.org/get'
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get',
       'http://httpbin.org/get'
    ]

urls_with_rate_limit = 
    [
       'http://eu.httpbin.org/get'
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get',
       'http://eu.httpbin.org/get'
    ]

api_rate = 2
api_limit = 6

loop = asyncio.get_event_loop()
    loop.run_until_complete(
        process(urls=urls_without_rate_limit, rate=0, limit=len(url_list)))

    loop.run_until_complete(
        process(urls=urls_with_rate_limit, rate=api_rate, limit=api_limit))
async def process(urls, rate, limit):
    limit = asyncio.Semaphore(limit)

    f = Fetch(
        rate=rate,
        limit=limit
    )

    tasks = []
    for url in urls:
        tasks.append(f.make_request(url=url))

    results = await asyncio.gather(*tasks)

如您所见,它将完成process的第一轮,然后开始第二轮速率限制。

效果很好,但是有没有办法我可以以不同的速率限制同时开始两回合?

tvm

python-3.x python-asyncio aiohttp
1个回答
0
投票

我将详细说明我的评论。因此,您可以尝试使用自己的解决方案(即使我在此处提供完整的代码)。

您可以拥有定义一些规则的字典(API->每秒速率限制:]

APIS_RATE_LIMIT_PER_S = {
  "http://api.mathjs.org/v4?precision=5": 1,
  "http://api.mathjs.org/v4?precision=2": 3,
}

然后您可以根据请求URL来决定选择哪个信号灯(实际上,您必须进行一些解析才能获得要控制的端点)。一旦有了,只需使用信号量来确保限制执行请求的并发进程数。难题的最后一部分显然是在释放信号量之前增加了延迟。

我将获得建议的here的不同版本,但基本上是相同的解决方案。我只是这样做了,所以您可以修改会话对象,以便每次对session.get的调用都将自动应用速率限制控制。

def set_rate_limits(session, apis_rate_limits_per_s):
    semaphores = {api: asyncio.Semaphore(s) for api, s in apis_rate_limits_per_s.items()}

    @asynccontextmanager
    async def limit_rate(url):
        await semaphores[url].acquire() 
        start = time.time()
        try:
            yield semaphores[url]
        finally:
            duration = time.time() - start
            await asyncio.sleep(1 - duration)
            semaphores[url].release()

    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                return await coroutine(url, *args, **kwargs)

        return coroutine_with_rate_limit

    session.get = add_limit_rate(session.get)
    session.post = add_limit_rate(session.post)
    return session

[注意,使用add_limit_rate您可以将速率限制控件添加到任何以API端点为第一个参数的协程中。但是这里我们只修改session.getsession.post

最后,您可以像这样使用set_rate_limits函数:

async def main():
    apis = APIS_RATE_LIMIT_PER_S.keys()
    params = [
        {"expr" : "2^2"},
        {"expr" : "1/0.999"},
        {"expr" : "1/1.001"},
        {"expr" : "1*1.001"},
    ]
    async with aiohttp.ClientSession() as session:
        session = set_rate_limits(session, APIS_RATE_LIMIT_PER_S)
        api_requests = [get_text_result(session, url, params=p) for url, p  in product(apis, params)]
        text_responses = await asyncio.gather(*api_requests)
        print(text_responses)


async def get_text_result(session, url, params=None):
    result = await session.get(url, params=params)
    return await result.text()

如果运行此代码,您将不会看到很多事情,可以在print的此处和此处添加一些set_rate_limits以“确保”正确执行速率限制:

import time

# [...] change this part : 
    def add_limit_rate(coroutine):

        async def coroutine_with_rate_limit(url, *args, **kwargs):
            async with limit_rate(url):
                ######### debug 
                global request_count
                request_count += 1
                this_req_id = request_count
                rate_lim = APIS_RATE_LIMIT_PER_S[url]
                print(f"request #{this_req_id} -> \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
                ########
                r = await coroutine(url, *args, **kwargs)

            ######### debug 
            print(f"request #{this_req_id} <- \t {(time.time() - start)*1000:5.0f}ms \t rate {rate_lim}/s")
            ######### 
            return r

如果运行此示例asyncio.run(main()),则应该得到类似以下内容:

request #1 ->        1ms     rate 1/s
request #2 ->        2ms     rate 3/s
request #3 ->        3ms     rate 3/s
request #4 ->        3ms     rate 3/s
request #1 <-     1003ms     rate 1/s
request #2 <-     1004ms     rate 3/s
request #3 <-     1004ms     rate 3/s
request #5 ->     1004ms     rate 1/s
request #6 ->     1005ms     rate 3/s
request #4 <-     1006ms     rate 3/s
request #5 <-     2007ms     rate 1/s
request #6 <-     2007ms     rate 3/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

似乎这里尊重速率限制,特别是我们可以看看每秒限制1个请求的速率的API:

request #1 ->        1ms     rate 1/s
request #1 <-     1003ms     rate 1/s
request #5 ->     1004ms     rate 1/s
request #5 <-     2007ms     rate 1/s
request #7 ->     2007ms     rate 1/s
request #7 <-     3008ms     rate 1/s
request #8 ->     3008ms     rate 1/s
request #8 <-     4010ms     rate 1/s

另一方面,此解决方案并不令人满意,因为我们在所有请求中人为地添加了1 ping。这是因为这部分代码:

await asyncio.sleep(1 - duration)
semaphores[url].release()

这里的问题是,在等待将控制权交还给事件循环(安排另一个任务,另一个请求)之前,我们正在等待睡眠完成。可以使用以下代码轻松解决:

asyncio.create_task(release_after_delay(semaphores[url], 1 - duration))    

[release_after_delay仅是:

async def release_after_delay(semaphore, delay):
    await asyncio.sleep(delay)
    semaphore.release()

asyncio.create_task函数使协程“在后台运行”。这意味着在此代码中,该信号量将在以后释放,但是我们不需要等待其释放就可以将控制权交还给even循环(这意味着可以调度其他请求)。换句话说,我们不在乎此协程的结果,我们只是希望它在将来的某个时候运行(这可能就是为什么此函数以前称为asyncio.create_task的原因)。

使用此补丁,我们为API设置了以下内容,并将速率限制设置为每秒一个请求:

ensure_future

它绝对接近我们期望此代码执行的操作。我们会尽快从API获得每个响应(在此示例中,ping为200ms / 37ms / 46ms / 41ms)。并且也要遵守速率限制。

这可能不是最漂亮的代码,但是它可以作为您使用的开始。也许您可以使用它制作一个干净的程序包,但是我认为这可能是其他人可能会喜欢的东西。

© www.soinside.com 2019 - 2024. All rights reserved.