考虑使用像这样的
lifespan
参数的 FastAPI:
def lifespan(app):
print('lifespan start')
yield
print('lifespan end')
app = FastAPI(lifespan=lifespan)
现在我想注册一个具有自己的生命周期功能的子应用程序:
app.mount(mount_path, sub_app)
如何为子应用程序注册启动/关闭处理程序?
我能找到的所有解决方案要么需要控制
lifespan
生成器(我没有),要么涉及 add_event_handler
等已弃用的方法(当设置 lifespan
时不起作用)。
更新最小可重现示例:
from fastapi import FastAPI
# --- main app ---
def lifespan(_):
print("startup")
yield
print("shutdown")
app = FastAPI(lifespan=lifespan)
@app.get("/")
async def root():
return {"message": "Hello World"}
# --- sub app ---
sub_app = FastAPI()
@sub_app.get("/")
async def sub_root():
return {"message": "Hello Sub World"}
app.mount("/sub", sub_app)
app.on_event("startup")(lambda: print("sub startup")) # doesn't work
app.on_event("shutdown")(lambda: print("sub shutdown")) # doesn't work
跑步:
uvicorn my_app:app --port 8000
我找到了一个解决方案,但我不确定我是否喜欢它...它通过
app.router.lifespan_context
访问现有的寿命生成器,并用附加的启动/关闭命令包装它:
from contextlib import asynccontextmanager
...
main_app_lifespan = app.router.lifespan_context
@asynccontextmanager
async def lifespan_wrapper(app):
print("sub startup")
async with main_app_lifespan(app) as maybe_state:
yield maybe_state
print("sub shutdown")
app.router.lifespan_context = lifespan_wrapper
输出:
INFO: Waiting for application startup.
sub startup
startup
INFO: Application startup complete.
...
INFO: Shutting down
INFO: Waiting for application shutdown.
shutdown
sub shutdown
INFO: Application shutdown complete.