在下面的代码中,我试图查找所有包含扩展名'.py'的文件。我先对路径中的目录列表进行广度搜索,然后在结果子目录列表上递归调用相同的函数。我得到一个错误,说
Future exception was never retrieved
future: <Future finished exception=TypeError("'generator' object is not callable")>
Traceback (most recent call last):
下面给出代码
from pathlib import Path
from os.path import sep as pathsep
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(4)
async def main(basedir, query):
loop.run_in_executor(None, find_files(basedir, query) )
@asyncio.coroutine
def find_files(path, query_string):
subdirs = []
for p in path.iterdir():
fullpath = str(p.absolute)
if p.is_dir and not p.is_symlink():
subdirs.append(p)
if query_string in fullpath:
print(fullpath)
loop = asyncio.get_event_loop()
tasks = [loop.run_in_executor(executor, find_files, subdir, query_string) for subdir in subdirs] # this doesnt work
yield from asyncio.gather(*tasks)
return subdirs
query = '.py'
basedir = Path(pathsep).absolute()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.set_default_executor(executor)
try:
loop.run_until_complete(main(basedir, query))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
executor.shutdown(wait = True)
loop.close()
'''
编辑:我意识到下面来自user4815162342的错误,现在可以使用
import asyncio
from pathlib import Path
from os.path import sep as pathsep
from collections import deque
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
query = '.py'
basedir = Path(pathsep).absolute()
futures = deque()
def find_files(path, query_string):
subdirs = []
try:
for p in path.iterdir():
fullpath = str(p.absolute)
if p.is_dir and not p.is_symlink():
subdirs.append(p)
if query_string in fullpath:
print(fullpath)
return subdirs
async def main(executor):
loop = asyncio.get_event_loop()
task = [loop.run_in_executor(executor, find_files, basedir, query) ]
completed, _ = await asyncio.wait(task)
result = [t.result() for t in completed]
futures.append(result[0])
while futures:
future = futures.popleft()
for subdir in future:
task = [loop.run_in_executor(executor, find_files, subdir, query) ]
completed, _ = await asyncio.wait(task)
result = [t.result() for t in completed][0]
futures.append(task[0].result() )
if __name__ == "__main__":
tstart = time.time()
executor = ThreadPoolExecutor(
max_workers=4,
)
event_loop = asyncio.get_event_loop()
try:
event_loop.run_until_complete(
main(executor)
)
finally:
event_loop.close()
print('time elapsed is', time.time() - tstart)
您的代码有几个问题。
首先,run_in_executor
期望一个可调用对象,它将在另一个线程中调用该对象,暂停当前协程的执行,并在该可调用对象产生结果(或引发异常)后将其恢复。换句话说,run_in_executor
需要一个function,而是给它提供了一个协程对象,这就是为什么您会收到一个异常,指出该对象不可调用。
第二,您应该await run_in_executor
的结果。缺少await
的原因是,警告您永远不会检索到异常。
最后,您要调用的函数已经是一个协程,因此您首先不需要run_in_executor
。 run_in_executor
仅用于调用CPU绑定或旧版阻止代码。只需直接等待:await find_files(...)
。
请注意,您应该优先使用async def
和await
,而不是asyncio.coroutine
装饰器和yield from
。后者已过时,将很快删除。