无法从正在运行的事件循环中调用 asyncio.run()

问题描述 投票:0回答:1

我有一个在 uvicorn 服务器内运行的 python 应用程序。我已经创建了一个 Pub/sub 订阅者并尝试从我的 main.py 中启用它。我正在使用流式拉取订阅。现在,我的要求是,一旦创建了订阅者,控制权应该返回到 main.py,而不是在订阅者监听事件时被阻止

我的订阅者的代码如下-

from google.cloud import pubsub_v1
from app.services.subscription_service import save_bill_events
from app.utils import constants
from app.utils.logging_tracing_manager import get_logger
import traceback

print("Entered in bill_subscriber----------------------")
logger = get_logger(__file__)


def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    save_bill_events(message.data)
    message.ack()


async def create_bill_subscriber():
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path("{projectId}",
                                                     constants.BILL_EVENT_SUBSCRIPTION_ID)
    # Limit the subscriber to only have fixed number of  outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=50)
    streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control)
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result()
        except Exception as e:
            # Even in case of an exception, subscriber should keep listening
            logger.error(
                f"An error occurred while pulling message from subscription {constants.BILL_EVENT_SUBSCRIPTION_ID}",
                exc_info=True)
            traceback.print_exc()
            pass

从我的 main.py 中,我尝试使用 asyncio 调用上述方法

asyncio.run(main=bill_subscriber.create_bill_subscriber())

但我看到一个错误

RuntimeError: asyncio.run() cannot be called from a running event loop
。我是否没有正确使用
asyncio.run()

uvicorn 是否有可能在事件循环内运行应用程序,因此我们无法启动另一个事件循环?如果是这样的话,还有其他方法可以在后台启动订阅者吗?

python async-await python-asyncio google-cloud-pubsub publish-subscribe
1个回答
0
投票

uvicorn 是否有机会在事件循环中运行应用程序 因此我们无法启动另一个事件循环?如果是这样的话,就是 还有其他方法可以在后台启动订阅者吗?

是的,

uvicorn
的整个想法是它运行一个异步循环,并且可以选择将它们的视图编写为异步协同例程 - 然后它确实为每个请求使用一个异步任务,而不是更昂贵的,系统资源术语,线程。

可以简单地将新任务添加到 uvicorn 使用的正在运行的 asyncio 循环中,并且只需小心处理在“作为请求视图的任务”默认值之外创建的任务的踪迹。 Python 的 asyncio 模型实际上就是为此而设计的。

您只需更改对函数的调用,这样 它从内部工作并且已经在运行循环

import asyncio
...

my_tasks = set()

def someview(...):
     ...
     # instead of:
     # asyncio.run(main=bill_subscriber.create_bill_subscriber())
     task = asyncio.create_task(bill_subscriber.create_bill_subscriber())
     my_tasks.add(task)
     task.add_done_callback(my_tasks.discard)
     ...

使用一个集合来保存在视图返回后应继续运行的任务的想法是,asyncio 需要您保留对此类任务的引用。上面的回调在完成时删除对其的引用应该足以避免任何资源泄漏。

此外,根据您在任务中执行的操作,您可能希望在自定义上下文中运行 - 检查 https://docs.python.org/3/library/contextvars.html 以了解该部分,如果你觉得需要它。

© www.soinside.com 2019 - 2024. All rights reserved.