我有一个小实用程序,可以使用
asyncio
并行调用同步代码。
import asyncio
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
if loop.is_running():
print('running loop scheduling there')
# implement the correct run inside the loop, without the run_until_complete which is crashing, because the loop already runs
future = asyncio.run_coroutine_threadsafe(call_many_async(fun, many_kwargs),
loop)
print('got the future')
res = future.result()
print('got the result')
return res
else:
return loop.run_until_complete(call_many_async(fun, many_kwargs))
并且在 python 中使用时效果很好
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))
来自 python 工作没有任何问题,但我在 Jupyter 中使用
IPython
时遇到问题,我只是得到
running loop scheduling there
got the future
它永远挂着。
原本我只是
def call_many(fun, many_kwargs):
loop = asyncio.get_event_loop()
return loop.run_until_complete(call_many_async(fun, many_kwargs))
但是我收到了错误
RuntimeError: This event loop is already running
如何解决?
当然是
results = await call_many_async(something_complex, ({"param": i} for i in range(1, 5)))
assert len(results) == 4
可以工作,但我想使用
call_many
作为更大代码库的一部分,我将从 jupyter 笔记本中调用该代码库。
我已阅读https://blog.jupyter.org/ipython-7-0-async-repl-a35ce050f7f7但我没有找到解决方案,因为我不想直接从jupyter笔记本调用异步代码单元格,但是来自同步代码。
jupyter 在主线程中运行自己的事件循环,同时 asyncio.run_coroutine_threadsafe 指出:
此函数应该从与 事件循环运行的地方。
我可能会在另一个线程中运行你的事件循环。
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from asyncio import AbstractEventLoop, BaseEventLoop
async def call_many_async(fun, many_kwargs):
return await asyncio.gather(*[asyncio.to_thread(fun, **kwargs) for kwargs in many_kwargs])
def call_many(fun, many_kwargs):
def run_func():
loop = asyncio.new_event_loop()
return loop.run_until_complete(call_many_async(fun, many_kwargs))
thread = threading.Thread(target=run_func)
thread.start()
thread.join()
import time
def something_complex(param) -> int:
print(f"call started with {param=}")
time.sleep(0.1) # calling some time-costly API
print("call ended")
return 3 # returning the result
results = call_many(something_complex, ({"param": i} for i in range(1, 5)))