我正在尝试为Python中长期运行的任务生成轮询机制。为此,我正在使用并发的Future并使用.done()
进行轮询。该任务存在许多本身被阻塞的迭代,这些迭代被包装在异步函数中。我在调用第三方软件时,无法访问阻止功能的代码。这是我当前方法的一个最小示例:
import asyncio
import time
async def blocking_iteration():
time.sleep(1)
async def long_running():
for i in range(5):
print(f"sleeping {i}")
await blocking_iteration()
async def poll_run():
future = asyncio.ensure_future(long_running())
while not future.done():
print("before polling")
await asyncio.sleep(0.05)
print("polling")
future.result()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(poll_run())
loop.close()
其结果是:
before polling
sleeping 0
sleeping 1
sleeping 2
sleeping 3
sleeping 4
polling
[根据我对Python中异步机制的当前了解,我曾期望循环在第一次睡眠后解除阻塞,将控制权返回到该循环,该循环将返回到poll_run await
语句,并且只会运行该循环的第二次迭代。后续轮询后的long_running函数。所以所需的输出是这样的:
before polling
sleeping 0
polling
before polling
sleeping 1
polling
before polling
sleeping 2
polling
before polling
sleeping 3
polling
before polling
sleeping 4
polling
可以通过当前的方法以某种方式实现,还是可以以其他方式实现?
编辑
感谢@drjackild能够通过更改来解决它
async def blocking_iteration():
time.sleep(1)
进入
def blocking():
time.sleep(1)
async def blocking_iteration():
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, blocking)
time
是同步库,执行时会阻塞整个主线程。如果您的程序中有此类阻塞调用,则可以避免使用线程或进程池执行程序进行阻塞(您可以阅读有关其内容here)。或者,将blocking_iteration
更改为使用asyncio.sleep
而不是time.sleep
UPD。为了清楚起见,这里是非阻塞版本,该版本使用loop.run_in_executor
和默认执行程序。请注意,blocking_iteration
现在没有async
import asyncio
import concurrent.futures
import time
def blocking_iteration():
time.sleep(1)
async def long_running():
loop = asyncio.get_event_loop()
for i in range(5):
print(f"sleeping {i}")
await loop.run_in_executor(None, blocking_iteration)
async def poll_run():
task = asyncio.create_task(long_running())
while not task.done():
print("before polling")
await asyncio.sleep(0.05)
print("polling")
print(task.result())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(poll_run())
loop.close()