aiohttp:限制并行请求的速率

问题描述 投票:0回答:5

API 通常有用户必须遵守的速率限制。作为示例,我们以每秒 50 个请求为例。顺序请求需要 0.5-1 秒,因此速度太慢,无法接近该限制。然而,aiohttp 的并行请求超出了速率限制。

为了尽可能快地轮询 API,需要限制并行调用的速率。

到目前为止我发现的装饰示例

session.get
,大致如下:

session.get = rate_limited(max_calls_per_second)(session.get)

这对于顺序调用非常有效。尝试在并行调用中实现这一点并不能按预期工作。

这里有一些代码作为示例:

async with aiohttp.ClientSession() as session:
    session.get = rate_limited(max_calls_per_second)(session.get)
    tasks = (asyncio.ensure_future(download_coroutine(  
          timeout, session, url)) for url in urls)
    process_responses_function(await asyncio.gather(*tasks))

这样做的问题是,它会对任务的“排队”进行速率限制。与 gather 的执行仍然会或多或少同时发生。两全其美;-).


是的,我在这里发现了一个类似的问题

aiohttp:设置每秒最大请求数

,但是两个回复都没有回答限制请求速率的实际问题。另外,Quentin Pradet 的博客文章仅适用于限制排队速率。 总结一下:如何限制并行

aiohttp

请求的每秒请求数

    

python parallel-processing python-asyncio aiohttp
5个回答
25
投票

asyncio

内部有一个名为

Semaphore
的对象,它的工作方式类似于异步
RLock
semaphore = asyncio.Semaphore(50)
#...
async def limit_wrap(url):
    async with semaphore:
        # do what you want
#...
coros = [limit_wrap(url) for url in urls]
results = await asyncio.gather(*coros)

更新了

假设我发出 50 个并发请求,它们都在 2 秒内完成。因此,它没有触及限制(每秒仅 25 个请求)。

这意味着我应该发出 100 个并发请求,它们也都在 2 秒内完成(每秒 50 个请求)。但在您实际提出这些请求之前,您如何确定它们会完成多长时间?

或者,如果您不介意

每秒完成的请求数

,但每秒发出的请求数。你可以: async def loop_wrap(urls): for url in urls: asyncio.ensure_future(download(url)) await asyncio.sleep(1/50) asyncio.ensure_future(loop_wrap(urls)) loop.run_forever()

上面的代码将每隔 
Future

秒创建一个

1/50
实例。
    


19
投票
aiohttp.ClientSession()

的子类来解决这个问题。我使用

asyncio.Queue()
代替
Semaphores
进行速率限制。我只重写了
_request()
方法。我发现这种方法更干净,因为您只需将
session = aiohttp.ClientSession()
替换为
session = ThrottledClientSession(rate_limit=15)
class ThrottledClientSession(aiohttp.ClientSession):
    """
    Rate-throttled client session class inherited from aiohttp.ClientSession)

    USAGE:
        replace `session = aiohttp.ClientSession()`
        with `session = ThrottledClientSession(rate_limit=15)`

    see https://stackoverflow.com/a/60357775/107049
    """

    MIN_SLEEP = 0.1

    def __init__(self, rate_limit: float = None, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.rate_limit = rate_limit
        self._fillerTask = None
        self._queue = None
        self._start_time = time.time()
        if rate_limit is not None:
            if rate_limit <= 0:
                raise ValueError('rate_limit must be positive')
            self._queue = asyncio.Queue(min(2, int(rate_limit) + 1))
            self._fillerTask = asyncio.create_task(self._filler(rate_limit))

    def _get_sleep(self) -> Optional[float]:
        if self.rate_limit is not None:
            return max(1 / self.rate_limit, self.MIN_SLEEP)
        return None

    async def close(self) -> None:
        """Close rate-limiter's "bucket filler" task"""
        if self._fillerTask is not None:
            self._fillerTask.cancel()
        try:
            await asyncio.wait_for(self._fillerTask, timeout=0.5)
        except asyncio.TimeoutError as err:
            print(str(err))
        await super().close()

    async def _filler(self, rate_limit: float = 1):
        """Filler task to fill the leaky bucket algo"""
        try:
            if self._queue is None:
                return
            self.rate_limit = rate_limit
            sleep = self._get_sleep()
            updated_at = time.monotonic()
            fraction = 0
            extra_increment = 0
            for i in range(0, self._queue.maxsize):
                self._queue.put_nowait(i)
            while True:
                if not self._queue.full():
                    now = time.monotonic()
                    increment = rate_limit * (now - updated_at)
                    fraction += increment % 1
                    extra_increment = fraction // 1
                    items_2_add = int(min(self._queue.maxsize - self._queue.qsize(), int(increment) + extra_increment))
                    fraction = fraction % 1
                    for i in range(0, items_2_add):
                        self._queue.put_nowait(i)
                    updated_at = now
                await asyncio.sleep(sleep)
        except asyncio.CancelledError:
            print('Cancelled')
        except Exception as err:
            print(str(err))

    async def _allow(self) -> None:
        if self._queue is not None:
            # debug
            # if self._start_time == None:
            #    self._start_time = time.time()
            await self._queue.get()
            self._queue.task_done()
        return None

    async def _request(self, *args, **kwargs)  -> aiohttp.ClientResponse:
        """Throttled _request()"""
        await self._allow()
        return await super()._request(*args, **kwargs)



4
投票

我通过使用带有生产者的队列来实现此目的,该生产者以速率限制生成新任务,然后许多消费者要么全部等待下一个作业(如果它们速度很快),要么在队列中备份工作(如果它们速度快的话)。速度很慢,并且会按照处理器/网络允许的速度运行:

import asyncio from datetime import datetime async def download(url): # download or whatever task_time = 1/10 await asyncio.sleep(task_time) result = datetime.now() return result, url async def producer_fn(queue, urls, max_per_second): for url in urls: await queue.put(url) await asyncio.sleep(1/max_per_second) async def consumer(work_queue, result_queue): while True: url = await work_queue.get() result = await download(url) work_queue.task_done() await result_queue.put(result) urls = range(20) async def main(): work_queue = asyncio.Queue() result_queue = asyncio.Queue() num_consumer_tasks = 10 max_per_second = 5 consumers = [asyncio.create_task(consumer(work_queue, result_queue)) for _ in range(num_consumer_tasks)] producer = asyncio.create_task(producer_fn(work_queue, urls, max_per_second)) await producer # wait for the remaining tasks to be processed await work_queue.join() # cancel the consumers, which are now idle for c in consumers: c.cancel() while not result_queue.empty(): result, url = await result_queue.get() print(f'{url} finished at {result}') asyncio.run(main())



2
投票
https://pypi.org/project/octopus-api/

),它使您能够使用 aiohttp 进行速率限制并设置对端点的连接(并行)调用数量引擎盖。它的目标是简化所有所需的 aiohttp 设置。 这里是一个如何使用它的例子,其中

get_ethereum

是用户自定义的请求函数: from octopus_api import TentacleSession, OctopusApi from typing import Dict, List if __name__ == '__main__': async def get_ethereum(session: TentacleSession, request: Dict): async with session.get(url=request["url"], params=request["params"]) as response: body = await response.json() return body client = OctopusApi(rate=50, resolution="sec", connections=6) result: List = client.execute(requests_list=[{ "url": "https://api.pro.coinbase.com/products/ETH-EUR/candles?granularity=900&start=2021-12-04T00:00:00Z&end=2021-12-04T00:00:00Z", "params": {}}] * 1000, func=get_ethereum) print(result)

TentacleSession 的工作方式与为 aiohttp.ClientSession 编写 POST、GET、PUT 和 PATCH 的方式相同。

请告诉我它是否可以帮助您解决与速率限制和并行调用相关的问题。


0
投票
await asyncio.sleep(1.1)

的 create_task() 。使用 create_task 创建的任何任务都会立即运行:

    for i in range(THREADS):
        await asyncio.sleep(1.1)
        tasks.append(
            asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10))))
        )
    await asyncio.gather(*tasks) 

在下面的示例中,通过使用 async_payload_wrapper 中的 ClientSession() 上下文并设置连接器的限制,也解决了限制同时连接数的另一个问题。

通过此设置,我可以运行 25 个协程 (THREADS=25),每个协程在 URL 队列上循环,并且不会违反 25 个并发连接规则:

async def send_request(session, url, routine): start_time = time.time() print(f"{routine}, sending request: {datetime.now()}") params = { 'api_key': 'nunya', 'url': '%s' % url, 'render_js': 'false', 'premium_proxy': 'false', 'country_code':'us' } try: async with session.get(url='http://yourAPI.com',params=params,) as response: data = await response.content.read() print(f"{routine}, done request: {time.time() - start_time} seconds") return data except asyncio.TimeoutError as e: print('timeout---------------------') errors.append(url) except aiohttp.ClientResponseError as e: print('request failed - Server Error') errors.append(url) except Exception as e: errors.append(url) async def getData(session, q, test): while True: if not q.empty(): url = q.get_nowait() resp = await send_request(session, url ,test) if resp is not None: processData(resp, test, url) else: print(f'{test} queue empty') break async def async_payload_wrapper(): tasks = [] q = asyncio.Queue() for url in urls: await q.put(url) async with ClientSession(connector=aiohttp.TCPConnector(limit=THREADS), timeout=ClientTimeout(total=61), raise_for_status=True) as session: for i in range(THREADS): await asyncio.sleep(1.1) tasks.append( asyncio.create_task(getData(session, q, ''.join(random.choice(string.ascii_lowercase) for i in range(10)))) ) await asyncio.gather(*tasks) if __name__ == '__main__': start_time = time.time() asyncio.run(async_payload_wrapper())

© www.soinside.com 2019 - 2024. All rights reserved.