import queue
qq = queue.Queue()
qq.put('hi')
class MyApp():
def __init__(self, q):
self._queue = q
def _process_item(self, item):
print(f'Processing this item: {item}')
def get_item(self):
try:
item = self._queue.get_nowait()
self._process_item(item)
except queue.Empty:
pass
async def listen_for_orders(self):
'''
Asynchronously check the orders queue for new incoming orders
'''
while True:
self.get_item()
await asyncio.sleep(0)
a = MyApp(qq)
loop = asyncio.get_event_loop()
loop.run_until_complete(a.listen_for_orders())
使用Python 3.6。
我正在尝试编写一个事件处理程序,它不断地监听queue
中的消息,并处理它们(在这种情况下打印它们)。但它必须是异步的 - 我需要能够在终端(IPython)中运行它并手动将内容提供给queue
(至少最初是为了测试)。
此代码不起作用 - 它永远阻止。
如何让这个运行永远运行,但在每次while
循环迭代后返回控制?
谢谢。
注意:为了使事件循环与IPython(版本7.2)一起使用,我使用this库中的ib_insync
代码,我在上面的示例中使用此库来解决实际问题。
您需要使队列成为asyncio.Queue
,并以线程安全的方式将事物添加到队列中。例如:
qq = asyncio.Queue()
class MyApp():
def __init__(self, q):
self._queue = q
def _process_item(self, item):
print(f'Processing this item: {item}')
async def get_item(self):
item = await self._queue.get()
self._process_item(item)
async def listen_for_orders(self):
'''
Asynchronously check the orders queue for new incoming orders
'''
while True:
await self.get_item()
a = MyApp(qq)
loop = asyncio.get_event_loop()
loop.run_until_complete(a.listen_for_orders())
您的其他线程必须将内容放入队列中,如下所示:
loop.call_soon_threadsafe(qq.put_nowait, <item>)
call_soon_threadsafe
将确保正确锁定,并且当新队列项准备就绪时唤醒事件循环。
这不是异步队列。你需要使用asyncio.Queue
qq = queue.Queue()
Async是一个事件循环。你调用循环转移控制它,它循环,直到你的功能完成,永远不会发生:
loop.run_until_complete(a.listen_for_orders())
你评论说:
我有另一个线程轮询外部网络资源的数据(I / O密集)并将传入的消息转储到此线程。
编写该代码异步 - 所以你有:
async def run():
while 1:
item = await get_item_from_network()
process_item(item)
loop = asyncio.get_event_loop()
loop.run_until_complete( run() )
如果您不想这样做,那么您可以做的就是逐步完成循环,尽管您不想这样做。
import asyncio
def run_once(loop):
loop.call_soon(loop.stop)
loop.run_forever()
loop = asyncio.get_event_loop()
for x in range(100):
print(x)
run_once(loop)
然后你只需调用你的异步函数,每次调用run_once时,如果队列中有一个项目,它将检查你的(asyncio队列)并将控制传递给你的listen for orders函数。