我在 FastAPI 应用程序中有以下类:
import asyncio
import logging
from multiprocessing import Lock, Process
from .production_status import Job as ProductionStatusJob
class JobScheduler:
loop = None
logger = logging.getLogger("job_scheduler")
process_lock = Lock()
JOBS = [ProductionStatusJob]
@classmethod
def start(cls) -> None:
cls.logger.info("Starting Up (1/2)")
Process(target=cls._loop).start()
@classmethod
def _loop(cls) -> None:
cls.loop = asyncio.get_event_loop()
cls.loop.create_task(cls._run())
cls.logger.info("Startup Complete (2/2)")
cls.loop.run_forever()
cls.loop.close()
@classmethod
async def _run(cls) -> None:
while True:
...
@classmethod
async def stop(cls) -> None:
cls.logger.info("Shutting Down (1/2)")
with cls.process_lock:
cls.loop.stop() # <= This Line
cls.loop.close()
cls.logger.info("Shutdown Complete (2/2)")
cls.loop = None
在 FastAPI 应用程序的
startup
和 shutdown
事件上,将调用 JobScheduler.start()
和 JobScheduler.stop()
方法。
start
方法运行顺利,但在stop
中我收到错误:
File "/backend/app/main.py", line 146, in stop_job_scheduler
2023-08-16 11:46:27 await job_scheduler.stop()
2023-08-16 11:46:27 File "/backend/app/jobs/__init__.py", line 59, in stop
2023-08-16 11:46:27 cls.loop.stop()
2023-08-16 11:46:27 AttributeError: 'NoneType' object has no attribute 'stop'
但是
cls.loop
是在 _loop
方法期间设置的(在 start
结束时执行) - 那么为什么在调用 cls.loop
方法时 None
仍然具有其初始 stop
值?
当FastAPI应用程序调用
shutdown
时,有没有更好的方法来清理后台进程?
multiprocessing
很有趣。它比多线程更强大,但也有一些注意事项。首先,您实际上正在运行一个完全不同的 Python 解释器。这意味着全局变量等将为您运行的每个进程获取一个新副本。
根据您的操作系统和启动方法的选择,您的进程可能是“分叉”或“衍生”。一个“spawned”进程将重新开始,就像一个新的Python程序刚刚启动一样。 forked进程将从源进程获取变量的所有当前值,但它仍然会复制所有这些变量。如果不使用 multiprocessing
帮助程序之一进行显式同步,未来对任一进程的更改都不会影响另一个进程。
您可以使用 Manager
将信息从一个进程传递到另一个进程。
感谢Silvio 和 Selcuk
我在分叉之前存储了Process
并在
stop
处杀死了它:
class JobScheduler:
manager = None
loop = None
logger = _logger
process_lock = Lock()
JOBS = [ProductionStatusJob]
@classmethod
def start(cls) -> None:
cls.logger.info("Starting Up (1/2)")
cls.loop = Process(target=cls._loop)
cls.loop.start()
@classmethod
def _loop(cls) -> None:
loop = asyncio.get_event_loop()
loop.create_task(cls._run())
cls.logger.info("Startup Complete (2/2)")
loop.run_forever()
loop.close() # <= is probably never called
...
@classmethod
async def stop(cls) -> None:
cls.logger.info("Shutting Down (1/2)")
with cls.process_lock:
cls.loop.kill()
cls.logger.info("Shutdown Complete (2/2)")
cls.loop = None