使用执行程序运行异步会引发我的返回类型无法调用的错误

问题描述 投票:0回答:1

在下面的代码中,我试图查找所有包含扩展名'.py'的文件。我先对路径中的目录列表进行广度搜索,然后在结果子目录列表上递归调用相同的函数。我得到一个错误,说

Future exception was never retrieved
future: <Future finished exception=TypeError("'generator' object is not callable")>
Traceback (most recent call last):

下面给出代码

from pathlib import Path
from os.path import sep as pathsep
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(4)

async def main(basedir, query):
    loop.run_in_executor(None, find_files(basedir, query) )

@asyncio.coroutine
def find_files(path, query_string):
    subdirs = []
    for p in path.iterdir():
            fullpath = str(p.absolute)
            if p.is_dir and not p.is_symlink():
                subdirs.append(p)
            if query_string in fullpath:
                print(fullpath)
    loop =  asyncio.get_event_loop()
    tasks = [loop.run_in_executor(executor, find_files, subdir, query_string) for subdir in subdirs] # this doesnt work
    yield from asyncio.gather(*tasks)
    return subdirs


query = '.py'
basedir = Path(pathsep).absolute() 

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.set_default_executor(executor)
    try:
        loop.run_until_complete(main(basedir, query))
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        executor.shutdown(wait = True)
        loop.close()

'''

编辑:我意识到下面来自user4815162342的错误,现在可以使用

import asyncio
from pathlib import Path
from os.path import sep as pathsep
from collections import deque
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

query = '.py'
basedir = Path(pathsep).absolute() 
futures = deque()

def find_files(path, query_string):
    subdirs = []
    try:
        for p in path.iterdir():
                fullpath = str(p.absolute)
                if p.is_dir and not p.is_symlink():
                    subdirs.append(p)
                if query_string in fullpath:
                    print(fullpath)
    return subdirs

async def main(executor):
    loop = asyncio.get_event_loop()
    task = [loop.run_in_executor(executor, find_files, basedir, query) ]
    completed, _ = await asyncio.wait(task)
    result = [t.result() for t in completed]
    futures.append(result[0])
    while futures:
        future = futures.popleft()
        for subdir in future:
            task = [loop.run_in_executor(executor, find_files, subdir, query) ]
            completed, _ = await asyncio.wait(task)
            result = [t.result() for t in completed][0]
            futures.append(task[0].result() )


if __name__ == "__main__":
    tstart = time.time()
    executor = ThreadPoolExecutor(
        max_workers=4,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            main(executor)
        )
    finally:
        event_loop.close()    
print('time elapsed is', time.time() - tstart)
python-3.x asynchronous python-asyncio concurrent.futures
1个回答
0
投票

您的代码有几个问题。

  • 首先,run_in_executor期望一个可调用对象,它将在另一个线程中调用该对象,暂停当前协程的执行,并在该可调用对象产生结果(或引发异常)后将其恢复。换句话说,run_in_executor需要一个function,而是给它提供了一个协程对象,这就是为什么您会收到一个异常,指出该对象不可调用。

  • 第二,您应该await run_in_executor的结果。缺少await的原因是,警告您永远不会检索到异常。

  • 最后,您要调用的函数已经是一个协程,因此您首先不需要run_in_executorrun_in_executor仅用于调用CPU绑定或旧版阻止代码。只需直接等待:await find_files(...)

请注意,您应该优先使用async defawait,而不是asyncio.coroutine装饰器和yield from。后者已过时,将很快删除。

© www.soinside.com 2019 - 2024. All rights reserved.