Python QPID Proton 容器阻止代码执行

问题描述 投票:0回答:1

我有一个 Python 3.9 项目,我想在其中运行一个连接到 AMQP 1.0 代理并侦听传入消息的服务。相同的服务还应该提供 RESTful API,以便触发消息发送到代理。

我使用的图书馆:

  • python-qpid-质子(AMQP)
  • fastapi(API)

这是消息接收器初始化和启动的示例:

from proton.handlers import MessagingHandler
from proton.reactor import Container
 
class Receiver(MessagingHandler):
    def __init__(self, url, address):
        super(Receiver, self).__init__()
        self.url = url
        self.address = address
 
    def on_start(self, event):
        event.container.connect(self.url)
 
    def on_connection_opened(self, event):
        print("Connected to {}".format(self.url))
        event.container.create_receiver(event.connection, self.address)
 
    def on_message(self, event):
        print("Received message: {}".format(event.message.body))
 
 
url = "amqps://user:pass@localhost:5671"
address = "example"
 
handler = Receiver(url, address)
receiving_container = Container(handler)
 
try:
    receiving_container.run()
    print("Container started")
except KeyboardInterrupt:
    pass

现在我的问题是,一旦我使用 run() 启动receive_container,它就会阻止在侦听消息时执行任何其他代码。这可以防止我同时运行除 receive_container 之外的任何其他内容。

我知道有 asyncio 可以帮助解决此类问题,我已经尝试过但没有得出任何结论。我可以使用 Uvicorn 异步运行 FastAPI,但我相信容器本身必须以异步方式实现才能与 asyncio 一起工作。

有人知道这个问题的解决方案/解决方法吗?

python python-asyncio amqp qpid
1个回答
0
投票

FastAPI 提供了

asyncio
接口,因此您可以将其用于 HTTP API 服务。因此,请使用
asyncio
事件循环来处理您的 HTTP API,然后当您收到应转发到消息代理的请求时,您可以使用
asyncio.to_thread(<whatever sends a message to the broker>)
来执行此操作。

由于

receiving_container.run()
希望拥有其运行的整个线程,因此您需要启动
asyncio
事件循环,在其自己的线程上运行 HTTP API。我已经在这里解释了如何做到这一点:

或者您可以在其自己的线程上启动 AMQP 消息服务,让 HTTP API

asyncio
事件循环在主线程上运行。

© www.soinside.com 2019 - 2024. All rights reserved.