我想进行异步数据库查询并将其插入到
cls.__setitem__
中,但是当然,我不能在同步函数中使用 await
。我该如何解决这个问题?
找到了解决方案。不可能创建异步 dunder 方法,因此我们可以做的是使用异步但不是 dunder 的方法。
class Foo:
async def setitem(key, item):
await for_some_stuff()
是我的案例的解决方案
异步函数实际上只是返回可等待内容的函数。通常是用于惰性求值(等待时)的协程,或者用于急切求值的 Future 或 Task。以下是我如何执行异步 dunder 方法:
import asyncio
from collections.abc import Coroutine
from typing import Any
class Foo:
def __getitem__(self, delay: float) -> Coroutine[Any, Any, str]:
async def get():
print("waiting...")
await asyncio.sleep(delay)
return "Hello!"
return get()
async def main():
foo = Foo()
print(await foo[1.5])
asyncio.run(main())