如何在 Jupyter Notebook 中的同步上下文中正确安排和等待异步代码中的结果?

问题描述 投票:0回答:1

我有一个小实用程序,可以使用

asyncio
并行调用同步代码。

import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    if loop.is_running():
        print('running loop scheduling there')
        # implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
        future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
                                                loop)
        print('got the future')
        res = future.result()
        print('got the result')
        return res
    else:
        return loop.run_until_complete(call_many_async(fun, many_kwargs))

并且在 python 中使用时效果很好

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))

来自 python 工作没有任何问题,但我在 Jupyter 中使用

IPython
时遇到问题,我只是得到

running loop scheduling there
got the future

它永远挂着。

原本我只是

def call_many(fun, many_kwargs):
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(call_many_async(fun, many_kwargs))

但是我收到了错误

RuntimeError: This event loop is already running

如何解决?

当然是

results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4

可以工作,但我想使用

call_many
作为更大代码库的一部分,我将从 jupyter 笔记本中调用该代码库。 我已阅读https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7但我没有找到解决方案,因为我不想直接从jupyter笔记本调用异步代码单元格,但是来自同步代码。

python jupyter-notebook python-asyncio
1个回答
0
投票

jupyter 在主线程中运行自己的事件循环,同时 asyncio.run_coroutine_threadsafe 指出:

此函数应该从与 事件循环运行的地方。

我可能会在另一个线程中运行你的事件循环。

import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop

async def call_many_async(fun, many_kwargs):
    return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
    
def call_many(fun, many_kwargs):
    def run_func():
        loop = asyncio.new_event_loop()
        return loop.run_until_complete(call_many_async(fun, many_kwargs))
    thread = threading.Thread(target=run_func)
    thread.start()
    thread.join()

import time
def something_complex(param) -> int:
    print(f"call started with {param=}")
    time.sleep(0.1) # calling some time-costly API
    print("call ended")
    return 3 # returning the result

results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
© www.soinside.com 2019 - 2024. All rights reserved.