我正在努力从持久的 Python 函数中的队列中获取 Azure ServiceBus 消息。我之前已经能够通过非持久性的 Python Azure Function 执行此操作,但我的工作流程需要持久性方面。鉴于此,我就采用了我原来写的获取消息的函数 -
@app.activity_trigger(input_name="queue")
def get_messages(queue: str) -> list:
try:
async def run():
sb_client = ServiceBusClient.from_connection_string(
conn_str=os.environ.get("SERVICEBUS_CONN_STR")
)
logger.info(f"client - {sb_client}")
with sb_client:
receiver = sb_client.get_queue_receiver(queue_name=queue)
with receiver:
received_messages = receiver.receive_messages(
max_wait_time=5,
max_message_count=1
)
if len(received_messages) > 0:
for msg in received_messages:
print(f"Received - {msg}")
receiver.complete_message(msg)
return received_messages
else:
return {"messageCount": 0}
asyncio.run(run())
except Exception as e:
logger.warning(f"Unable to receive messages - {e}")
return {"item": "yup"}
添加了 Activity_triggger 的功能(忽略返回,目前需要一个可序列化的 JSON),以便在持久函数中使用它。
我收到一条错误消息,仅显示
__enter__
。我之前一直试图在我们的代码中构建一个充当 ServiceBus 助手的类,因此对 __enter__
问题进行了大量阅读和研究。我最终放弃了这条路径,因为我遇到了与在我自己的类中使用 ServiceBusClient 类相关的错误。
我尝试在没有 with 语句的情况下运行该函数,删除了 async/await 部分等。如果没有任何 async/await,我会遇到与线程相关的运行时错误。一旦我重新添加异步/等待,我就会收到此错误,因为缺少
__enter__
方法。
我的期望是,如果我不构建一个类,我不应该被要求提供
__enter__
方法。我希望找到其他人可能有的关于从持久函数接收队列消息的一些示例(据我所知,文档非常缺乏。
sb_client
或 receiver
均未定义为上下文管理器,因此您不能使用 with
。这两行是你的问题。
我认为你根本不需要
with
块。只需删除它们并取消代码缩进即可