使用Async的Python事件处理程序(非阻塞while循环)

问题描述 投票:3回答:2
import queue

qq = queue.Queue()
qq.put('hi')

class MyApp():

    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    def get_item(self):
        try:
            item = self._queue.get_nowait()
            self._process_item(item)
        except queue.Empty:
            pass

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            self.get_item()
            await asyncio.sleep(0)      

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

使用Python 3.6。

我正在尝试编写一个事件处理程序,它不断地监听queue中的消息,并处理它们(在这种情况下打印它们)。但它必须是异步的 - 我需要能够在终端(IPython)中运行它并手动将内容提供给queue(至少最初是为了测试)。

此代码不起作用 - 它永远阻止。

如何让这个运行永远运行,但在每次while循环迭代后返回控制?

谢谢。

注意:为了使事件循环与IPython(版本7.2)一起使用,我使用this库中的ib_insync代码,我在上面的示例中使用此库来解决实际问题。

python python-3.x python-asyncio
2个回答
2
投票

您需要使队列成为asyncio.Queue,并以线程安全的方式将事物添加到队列中。例如:

qq = asyncio.Queue()

class MyApp():
    def __init__(self, q):
        self._queue = q

    def _process_item(self, item):
        print(f'Processing this item: {item}')

    async def get_item(self):
        item = await self._queue.get()
        self._process_item(item)

    async def listen_for_orders(self):  
        '''
        Asynchronously check the orders queue for new incoming orders
        '''
        while True:
            await self.get_item()

a = MyApp(qq)

loop = asyncio.get_event_loop()

loop.run_until_complete(a.listen_for_orders())

您的其他线程必须将内容放入队列中,如下所示:

loop.call_soon_threadsafe(qq.put_nowait, <item>)

call_soon_threadsafe将确保正确锁定,并且当新队列项准备就绪时唤醒事件循环。


2
投票

这不是异步队列。你需要使用asyncio.Queue

qq = queue.Queue()

Async是一个事件循环。你调用循环转移控制它,它循环,直到你的功能完成,永远不会发生:

loop.run_until_complete(a.listen_for_orders())

你评论说:

我有另一个线程轮询外部网络资源的数据(I / O密集)并将传入的消息转储到此线程。

编写该代码异步 - 所以你有:

async def run():
    while 1:
        item = await get_item_from_network()
        process_item(item)

loop = asyncio.get_event_loop()
loop.run_until_complete( run() )

如果您不想这样做,那么您可以做的就是逐步完成循环,尽管您不想这样做。

import asyncio


def run_once(loop):
    loop.call_soon(loop.stop)
    loop.run_forever()


loop = asyncio.get_event_loop()

for x in range(100):
    print(x)
    run_once(loop)

然后你只需调用你的异步函数,每次调用run_once时,如果队列中有一个项目,它将检查你的(asyncio队列)并将控制传递给你的listen for orders函数。

© www.soinside.com 2019 - 2024. All rights reserved.