我在 FastAPI 应用程序中有以下类:
import asyncio
import logging
from multiprocessing import Lock, Process
from .production_status import Job as ProductionStatusJob
class JobScheduler:
loop = None
logger = logging.getLogger("job_scheduler")
process_lock = Lock()
JOBS = [ProductionStatusJob]
@classmethod
def start(cls) -> None:
cls.logger.info("Starting Up (1/2)")
Process(target=cls._loop).start()
@classmethod
def _loop(cls) -> None:
cls.loop = asyncio.get_event_loop()
cls.loop.create_task(cls._run())
cls.logger.info("Startup Complete (2/2)")
cls.loop.run_forever()
cls.loop.close()
@classmethod
async def _run(cls) -> None:
while True:
...
@classmethod
async def stop(cls) -> None:
cls.logger.info("Shutting Down (1/2)")
with cls.process_lock:
cls.loop.stop() # <= This Line
cls.loop.close()
cls.logger.info("Shutdown Complete (2/2)")
cls.loop = None
在 FastAPI 应用程序的
startup
和 shutdown
事件上,将调用 JobScheduler.start()
和 JobScheduler.stop()
方法。开始很顺利,但是,在 stop
期间,我收到此错误:
File "/backend/app/main.py", line 146, in stop_job_scheduler
2023-08-16 11:46:27 await job_scheduler.stop()
2023-08-16 11:46:27 File "/backend/app/jobs/__init__.py", line 59, in stop
2023-08-16 11:46:27 cls.loop.stop()
2023-08-16 11:46:27 AttributeError: 'NoneType' object has no attribute 'stop'
我觉得这个bug很奇怪,因为我在
cls.loop
期间设置了_loop()
(在start()
结束时执行),所以为什么类变量仍然被设置为其默认的None
值当拨打.stop()
时?
顺便说一句,当应用程序想要关闭以清理后台进程时,我试图关闭事件循环,所以如果您有更多建议,我很高兴听到它们!干杯。
multiprocessing
很有趣。它比多线程更强大,但也有一些注意事项。首先,您实际上正在运行一个完全不同的 Python 解释器。这意味着全局变量等将为您运行的每个进程获取一个新副本。
Manager
在进程之间显式同步数据。这有点像两个进程都连接到的本地服务器。对于更明确的发布-订阅数据,您还可以使用 Queue
将信息从一个进程传递到另一个进程。