我计划使用 APScheduler 持续处理调度任务,并且我正在尝试创建一个使用现有数据库的自定义作业存储,该数据库依赖于异步功能(就像我的应用程序的整个其余部分一样),并且在 APS 中不受本机支持。我需要为作业存储实现的所有方法都不是异步的,并且我绝对不想在等待数据库响应时阻止应用程序的其他部分运行。
创建任务允许异步函数无阻塞地运行,但我们无法从中获取任何输出。
class JobStore(BaseJobStore):
# --snip--
def add_job(self, job: Job):
s = SerializedJob.create(job)
asyncio.get_event_loop().create_task(self.db_connection.store(s))
忙于等待任务完成会阻止应用程序的其余部分运行。
class JobStore(BaseJobStore):
# --snip--
def lookup_job(self, job_id: str):
fetch_task = asyncio.get_event_loop().create_task(
self.db_connection.fetch(SerializedJob, f"SerializedJob:{job_id}")
)
while not fetch_task.done(): pass # THIS BLOCKS EXECUTION! NO BUENO!
if exc := fetch_task.exception():
raise exc
res: SerializedJob = fetch_task.result()
return SerializedJob.to_aps_job()
使用
AbstractEventLoop.run_until_complete()
?没有。该循环已经在运行应用程序的其余部分。
我是否只能放弃迁移或运行两个不同的数据库?或者我的异步依赖数据库还有希望吗?