限制Python并发http请求的最佳方法(无线程)?

问题描述 投票:0回答:3

我有兴趣为异步函数调用创建一个池(它们将是 HTTP 请求),但是我想在单个线程中完成所有操作。这样做的原因是产生多个线程会浪费资源(线程除了等待响应之外什么都不做)。

import asyncio
import aiohttp

import some_library as pool

POOL_LIMIT = 3

urls = ["example.com/28409078",
"example.com/31145880",
"example.com/54622752",
"example.com/48008963",
"example.com/82016326",
"example.com/75587921",
"example.com/2988065",
"example.com/47574087",
"example.com/13478021",
"example.com/46041669"]

def get(url):
  # return some promise here

# now perform the async operations
pool(limit=POOL_LIMIT, urls, get)


有没有可以为我管理异步池的Python库?在 Node.js 中,看起来有一个库可以做一些接近我想做的事情:https://github.com/rxaviers/async-pool

python asynchronous web-scraping
3个回答
3
投票

这里我使用基本的

asyncio
函数实现了一个池。

工作:

  • 池以 maxsize 任务开始
  • 当第一个任务完成时,它将下一个任务添加到队列中并打印其结果
  • 类似地,每完成一个任务,它就会添加另一个任务,直到达到 maxsize

代码:

import asyncio

async def pool(tasks, maxsize=3):
    pending = [asyncio.create_task(tasks.pop(0)) for _ in range(maxsize) if tasks]
    while pending:
        (done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        while True:
             if (not tasks) or (len(pending) >= maxsize):
                  break
             pending.add(asyncio.create_task(tasks.pop(0)))
        for task in done:
             print(task.result())
    print("POOL COMPLETED")

例如,您可以像这样创建任务和池:

async def work(index, sleep_time):
    await asyncio.sleep(sleep_time)
    return f"task {index} done"

tasks = [work(i, 1) for i in range(10)]

现在要运行任务,请调用 asyncio.run

asyncio.run(pool(tasks, 3))

这只会并行运行 3 个任务


0
投票

我不知道是否有一个流行的图书馆。这是一个简单的方法:

async def get(url):
  # return some promise here

async def processQueue():
  while len(urls):
    url = urls.pop()
    await get(url)

async def main():
  await asyncio.gather(
    processQueue(),
    processQueue(),
    processQueue()
  )

asyncio.run(main())

你可能需要在 pop() 之前加锁,我不确定。


0
投票

PaxPrz 的答案很完美,但如果有人感兴趣的话,我设法对其进行了更多美化

import asyncio
from typing import Any, Coroutine, TypeVar

T = TypeVar("T")


async def pool(tasks: list[Coroutine[Any, Any, T]], size=3):
    result: list[T] = []
    pending = [asyncio.create_task(task) for task in tasks[:size]]
    tasks = tasks[size:]
    while pending:
        (done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        while tasks and len(pending) < size:
            pending.add(asyncio.create_task(tasks.pop(0)))
        for task in done:
            result.append(task.result())
    return result
© www.soinside.com 2019 - 2024. All rights reserved.