确保aio_pika消费者永远与FastAPI一起运行

问题描述 投票:0回答:1

我编写了一个 aio_pika 消费者任务,该任务应该在 FastAPI 应用程序中永远运行。此任务是实现 pub/sub 模式:

的管理器对象的一部分
from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage

class MyManager(object):
    def __init__(self):
        self.waiter = asyncio.Future()
    
    def publish(self, value):
        waiter, self.waiter = self.waiter, asyncio.Future()
        waiter.set_result((value, self.waiter))

    async def subscribe(self):
        waiter = self.waiter
        while True:
            value, waiter = await waiter
            yield value

    __aiter__ = subscribe

    async def on_message(self, message: AbstractIncomingMessage) -> None:
        try:
            async with message.process():
                # do whatever deserialization of the received item
                item = json.loads(message.body)
                # share the item with subscribers
                self.publish(item)

        except Exception as e:
            logger.error(e, exc_info=True)

    async def run(self):
        connection = await connect_robust(
            settings.amqp_url,
            loop=asyncio.get_running_loop()
        )
        channel = await connection.channel()
        my_queue = await channel.get_queue('my-queue')

        await my_queue.consume(self.on_message)

        await asyncio.Future()

        await connection.close()

此消费者任务是在 FastAPI 应用程序启动期间创建的:

my_manager = asyncio.Future()

@app.on_event("startup")
async def on_startup():
    my_manager.set_result(MyManager())
    task = asyncio.create_task((await my_manager).run())

请注意,管理器仅在

on_startup
期间实例化,以确保存在现有的 asyncio 循环。

不幸的是,该任务在几周/几个月后停止工作。我无法记录是什么事件导致了这个。我不确定任务是否崩溃,或者与 AMQP 服务器的连接是否在没有重新连接的情况下断开。我什至不知道如何/在哪里捕获/记录问题。

此问题的原因可能是什么以及如何解决?

作为附加上下文,管理器用于“服务器发送事件”路由: @router.get('/items') async def items_stream(request: Request): async def event_publisher(): try: aiter = (await my_manager).__aiter__() while True: task = asyncio.create_task(aiter.__anext__()) event = await asyncio.shield(task) yield dict(data=event) except asyncio.CancelledError as e: print(f'Disconnected from client (via refresh/close) {request.client}') raise e return EventSourceResponse(event_publisher())

异步迭代器被屏蔽以防止
此处描述的问题

python python-asyncio fastapi pika python-pika
1个回答
0
投票

我认为跟踪或解决问题的唯一方法是添加记录器并在 Manager 类中记录每个步骤。

只需创建一个文件记录器并在 Manager 类中进行大量日志记录,这是找到崩溃根本原因的唯一方法。

如果连接关闭或丢失,您还可以查看服务器日志。

我不知道 aio-pika,但对我来说,你的代码似乎缺少重新连接逻辑。如果连接在运行方法中关闭,你在哪里重新启动它?

© www.soinside.com 2019 - 2024. All rights reserved.