我有兴趣为异步函数调用创建一个池(它们将是 HTTP 请求),但是我想在单个线程中完成所有操作。这样做的原因是产生多个线程会浪费资源(线程除了等待响应之外什么都不做)。
import asyncio
import aiohttp
import some_library as pool
POOL_LIMIT = 3
urls = ["example.com/28409078",
"example.com/31145880",
"example.com/54622752",
"example.com/48008963",
"example.com/82016326",
"example.com/75587921",
"example.com/2988065",
"example.com/47574087",
"example.com/13478021",
"example.com/46041669"]
def get(url):
# return some promise here
# now perform the async operations
pool(limit=POOL_LIMIT, urls, get)
有没有可以为我管理异步池的Python库?在 Node.js 中,看起来有一个库可以做一些接近我想做的事情:https://github.com/rxaviers/async-pool
这里我使用基本的
asyncio
函数实现了一个池。
工作:
代码:
import asyncio
async def pool(tasks, maxsize=3):
pending = [asyncio.create_task(tasks.pop(0)) for _ in range(maxsize) if tasks]
while pending:
(done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
while True:
if (not tasks) or (len(pending) >= maxsize):
break
pending.add(asyncio.create_task(tasks.pop(0)))
for task in done:
print(task.result())
print("POOL COMPLETED")
例如,您可以像这样创建任务和池:
async def work(index, sleep_time):
await asyncio.sleep(sleep_time)
return f"task {index} done"
tasks = [work(i, 1) for i in range(10)]
现在要运行任务,请调用 asyncio.run
asyncio.run(pool(tasks, 3))
这只会并行运行 3 个任务
我不知道是否有一个流行的图书馆。这是一个简单的方法:
async def get(url):
# return some promise here
async def processQueue():
while len(urls):
url = urls.pop()
await get(url)
async def main():
await asyncio.gather(
processQueue(),
processQueue(),
processQueue()
)
asyncio.run(main())
你可能需要在 pop() 之前加锁,我不确定。
PaxPrz 的答案很完美,但如果有人感兴趣的话,我设法对其进行了更多美化
import asyncio
from typing import Any, Coroutine, TypeVar
T = TypeVar("T")
async def pool(tasks: list[Coroutine[Any, Any, T]], size=3):
result: list[T] = []
pending = [asyncio.create_task(task) for task in tasks[:size]]
tasks = tasks[size:]
while pending:
(done, pending) = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
while tasks and len(pending) < size:
pending.add(asyncio.create_task(tasks.pop(0)))
for task in done:
result.append(task.result())
return result