我想下载一系列大文件(~200MB),并在下载文件时利用时间进行一些 CPU 密集型处理。我正在研究 asyncio 和 aiohttp。我的理解是我可以使用它们开始大量下载,然后在后台继续下载的同时在同一线程上进行一些繁重的计算。
然而,我发现下载暂停,而繁重的 CPU 进程继续进行,然后在计算完成后立即恢复。我在下面包括了一个最小的例子。我在脚本运行时直观地监视进程 CPU 和带宽。很明显,在大约 30 秒的计算期间下载暂停。难道我做错了什么?还是我不明白 aiohttp 可以做什么?
import asyncio
import time
import aiofiles
import aiohttp
async def download(session):
url = 'https://repo.anaconda.com/archive/Anaconda3-2022.10-Linux-s390x.sh' # 280 MB file
async with session.get(url) as resp:
async with aiofiles.open('./tmpfile', mode='wb') as f:
print('Starting the download')
data = await resp.read()
print('Starting the file write')
await f.write(data)
print('Download completed')
async def heavy_cpu_load():
await asyncio.sleep(5) # make sure the download has started
print('Starting the computation')
for i in range(200000000): # takes about 30 seconds on my laptop.
i ** 0.5
print('Finished the computation')
async def main():
async with aiohttp.ClientSession() as session:
timer = time.time()
tasks = [download(session), heavy_cpu_load()]
await asyncio.gather(*tasks)
print(f'All tasks completed in {time.time() - timer}s')
if __name__ == '__main__':
asyncio.run(main())
我想发生的事情是
aiohttp
确实完成了文件的下载,但是为了打开它并阅读它1,它需要GIL
来释放锁,但是CPU负载并没有释放 GIL
直到完成。
但是,如果你把
await asyncio.sleep(0)
2放在i ** 0.5
之后它会起作用。
await
只是为了确定是否有人想要控制 GIL。
如果你有一个不需要
await
的计算,那么你可以(并且应该)使用 loop.run_in_executor 在单独的线程中运行它,这样它就会在后台运行。
import asyncio
import time
import aiofiles
import aiohttp
async def download(session):
url = 'https://repo.anaconda.com/archive/Anaconda3-2022.10-Linux-s390x.sh' # 280 MB file
async with session.get(url) as resp:
async with aiofiles.open('./tmpfile', mode='wb') as f:
print('Starting the download')
data = await resp.read()
print('Starting the file write')
await f.write(data)
print('Download completed')
# not async
def heavy_cpu_load():
print('Starting the computation')
for i in range(200000000): # takes about 30 seconds on my laptop.
i ** 0.5
print('Finished the computation')
async def main():
async with aiohttp.ClientSession() as session:
timer = time.time()
tasks = [
download(session),
asyncio.get_running_loop().run_in_executor(None, func=heavy_cpu_load)
]
await asyncio.gather(*tasks)
print(f'All tasks completed in {time.time() - timer}s')
asyncio.run(main())