我正在尝试并行化在我的 ORM 挂钩之一中完成的一些工作。每当创建或修改模型时,我都想调用第三方系统来反映该更新。正如您所想象的那样,这个调用相当昂贵,我不希望它阻止调用者进行模型更改。此外,原始呼叫者不关心或不需要知道对第三方的呼叫是否成功/失败/等。
所以我们想要这样的东西:
class User(Base):
...
@event.listens_for(User, 'after_insert')
def update_third_party(mapper, connect, target):
third_party_client.create_user(target) # async and non-blocking
return
我已经看到并成功尝试过使用
BackgroundTasks
直接绑定到 FastApi 端点...但是通过 api 调用启动此更新感觉不正确(如果关联的 API 创建了 ,则很容易错过此更新) User
并且开发人员忘记在那里添加 background_task
)。对于建立更传统的作业队列基础设施(例如 celery/kafka)来说,这也感觉有点矫枉过正。
是否有一些相当简单的库可以完成与我正在寻找的类似的事情?
似乎
asyncio.run_in_executor
实现了我想要实现的目标。所以类似:
class User(Base):
...
@event.listens_for(User, 'after_insert')
def update_third_party(mapper, connect, target):
loop = asyncio.get_running_loop()
loop.run_in_executor(None, third_party_client.create_user, target)
return