我编写了一个 aio_pika 消费者任务,该任务应该在 FastAPI 应用程序中永远运行。此任务是实现 pub/sub 模式:
的管理器对象的一部分from aio_pika import connect_robust
from aio_pika.abc import AbstractIncomingMessage
class MyManager(object):
def __init__(self):
self.waiter = asyncio.Future()
def publish(self, value):
waiter, self.waiter = self.waiter, asyncio.Future()
waiter.set_result((value, self.waiter))
async def subscribe(self):
waiter = self.waiter
while True:
value, waiter = await waiter
yield value
__aiter__ = subscribe
async def on_message(self, message: AbstractIncomingMessage) -> None:
try:
async with message.process():
# do whatever deserialization of the received item
item = json.loads(message.body)
# share the item with subscribers
self.publish(item)
except Exception as e:
logger.error(e, exc_info=True)
async def run(self):
connection = await connect_robust(
settings.amqp_url,
loop=asyncio.get_running_loop()
)
channel = await connection.channel()
my_queue = await channel.get_queue('my-queue')
await my_queue.consume(self.on_message)
await asyncio.Future()
await connection.close()
此消费者任务是在 FastAPI 应用程序启动期间创建的:
my_manager = asyncio.Future()
@app.on_event("startup")
async def on_startup():
my_manager.set_result(MyManager())
task = asyncio.create_task((await my_manager).run())
请注意,管理器仅在
on_startup
期间实例化,以确保存在现有的 asyncio 循环。
不幸的是,该任务在几周/几个月后停止工作。我无法记录是什么事件导致了这个。我不确定任务是否崩溃,或者与 AMQP 服务器的连接是否在没有重新连接的情况下断开。我什至不知道如何/在哪里捕获/记录问题。
此问题的原因可能是什么以及如何解决?
作为附加上下文,管理器用于“服务器发送事件”路由:
@router.get('/items')
async def items_stream(request: Request):
async def event_publisher():
try:
aiter = (await my_manager).__aiter__()
while True:
task = asyncio.create_task(aiter.__anext__())
event = await asyncio.shield(task)
yield dict(data=event)
except asyncio.CancelledError as e:
print(f'Disconnected from client (via refresh/close) {request.client}')
raise e
return EventSourceResponse(event_publisher())
异步迭代器被屏蔽以防止此处描述的问题
我认为跟踪或解决问题的唯一方法是添加记录器并在 Manager 类中记录每个步骤。
只需创建一个文件记录器并在 Manager 类中进行大量日志记录,这是找到崩溃根本原因的唯一方法。
如果连接关闭或丢失,您还可以查看服务器日志。
我不知道 aio-pika,但对我来说,你的代码似乎缺少重新连接逻辑。如果连接在运行方法中关闭,你在哪里重新启动它?