我有一个函数希望作为后台任务运行,并将其添加到如下所示的端点上。使用返回值,我在第二个端点上轮询状态,该状态在我作为后台任务传递的函数中更新。
@app.post("/deploy")
async def deploy_vm(data: DeploymentParameters, background_tasks: BackgroundTasks):
deployment = Deployment.objects(name=data.vm.name).first()
if deployment and not data.settings.replace:
return {"msg": "Deployment Task already in progress", "id": deployment.id_}
deployment = Deployment(name=data.vm.name, payload=data.model_dump_json())
deployment.save()
background_tasks.add_task(perform_syncronous_deployment, data, deployment)
return {
"msg": "Deployment Task accepted",
"id": deployment.id_,
"tasks": deployment.tasks
}
但是,该函数永远不会被调用,并且该任务也不会显示在我用来轮询状态的端点上:
@app.get("/deploy/{id_}/status")
async def get_deployment_status(id_: str, background_tasks: BackgroundTasks):
deployment = Deployment.get_status(id_=id_).first()
for task in background_tasks.tasks:
logger.info(task.__dict__)
return deployment.status.to_json()
原始函数使用
concurrent.futures.ThreadPoolExecutor
同时执行一些操作,但即使我使其完全同步,它也不会被调用。
真正奇怪的是,当我在 jupyter 中使用普通的 FastAPI 时它就可以工作(端点和函数是相同的)——唯一的区别是我在应用程序中使用路由器,而不是直接向应用程序注册端点。
轮询有效,但该函数未运行,并且我没有看到任何错误消息。当我在任何端点设置断点并直接调用该函数时,它也可以工作。
我尝试使用调试器并从 cli 运行应用程序。
如果有任何关于我还可以尝试什么的提示,我将不胜感激。
提前谢谢您
该问题是由自定义日志记录路由器引起的。 从自定义 api 路由器启动后台任务 为我指明了正确的方向:
class LoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
req_body = await request.body()
log_request(req_body, request.url.path, request.method)
response = await original_route_handler(request)
if response.background:
response.background.add_task(BackgroundTask(log_response, response.body))
else:
response.background = BackgroundTask(log_response, response.body)
return response
return custom_route_handler
最初它只将日志响应任务(if/else语句的else部分)分配给后台,取代了所有其他后台任务。