Baskground任务FastAPI

问题描述 投票:0回答:1

我尝试使用 FastAPI 后台任务发送电子邮件,但电子邮件未发送。控制台中没有错误,其余代码运行正常。

端点

@user_router.post("/register", tags=["Auth"])
    async def register(user_data: UserCreate, service: UserService = Depends()):
    return await service.register_user(user_data)

注册逻辑和任务:

 async def register_user(self, data: dict) -> dict:
        ...
        result = await self.crud.create_model(self.INSTANCE, user_data)
        task = BackgroundTasks()
        task.add_task(self.email.send_email_verification, data.email)

发送邮件验证方法:

  async def send_email_verification(self, email: str):
        ...
        message = MessageSchema(
            recipients=[email],
            subject="Your email verification link",
            body=body,
            subtype="html",
        )

        await self.mail.send_message(message)

我认为问题可能出在电子邮件功能上,所以我用一个简单的函数替换了该任务,将文本打印到控制台。因此,问题可能与异步处理有关。

async-await fastapi background-task
1个回答
0
投票

问题的产生是因为

BackgroundTasks
不是 路径操作函数的参数
BackgroundTasks
应该通过依赖注入来工作。

以下是调整代码的方法:

@user_router.post("/register", tags=["Auth"])
async def register(
    user_data: UserCreate,
    background_tasks: BackgroundTasks,
    service: UserService = Depends()
):
    return await service.register_user(user_data, background_tasks)
async def register_user(self, data: dict, background_tasks: BackgroundTasks) -> dict:
    ...
    result = await self.crud.create_model(self.INSTANCE, user_data)
    background_tasks.add_task(self.email.send_email_verification, data.email)
© www.soinside.com 2019 - 2024. All rights reserved.