为什么在使用多处理时我的类属性没有被保留?

问题描述 投票:0回答:2

我在 FastAPI 应用程序中有以下类:

import asyncio
import logging
from multiprocessing import Lock, Process

from .production_status import Job as ProductionStatusJob


class JobScheduler:
    loop = None
    logger = logging.getLogger("job_scheduler")
    process_lock = Lock()
    JOBS = [ProductionStatusJob]

    @classmethod
    def start(cls) -> None:
        cls.logger.info("Starting Up (1/2)")
        Process(target=cls._loop).start()
    
    @classmethod
    def _loop(cls) -> None:
        cls.loop = asyncio.get_event_loop()
        cls.loop.create_task(cls._run())
        cls.logger.info("Startup Complete (2/2)")
        cls.loop.run_forever()
        cls.loop.close()

    @classmethod
    async def _run(cls) -> None:
        while True:
            ...

    @classmethod
    async def stop(cls) -> None:
        cls.logger.info("Shutting Down (1/2)")
        with cls.process_lock:
            cls.loop.stop()                          # <= This Line
            cls.loop.close()
        cls.logger.info("Shutdown Complete (2/2)")
        cls.loop = None

在 FastAPI 应用程序的

startup
shutdown
事件上,将调用
JobScheduler.start()
JobScheduler.stop()
方法。

start
方法运行顺利,但在
stop
中我收到错误:

File "/backend/app/main.py", line 146, in stop_job_scheduler
2023-08-16 11:46:27     await job_scheduler.stop()
2023-08-16 11:46:27   File "/backend/app/jobs/__init__.py", line 59, in stop
2023-08-16 11:46:27     cls.loop.stop()
2023-08-16 11:46:27 AttributeError: 'NoneType' object has no attribute 'stop'

但是

cls.loop
是在
_loop
方法期间设置的(在
start
结束时执行) - 那么为什么在调用
cls.loop
方法时
None
仍然具有其初始
stop
值?

当FastAPI应用程序调用

shutdown
时,有没有更好的方法来清理后台进程?

python multiprocessing fastapi event-loop class-variables
2个回答
2
投票
Python 中的

multiprocessing
很有趣。它比多线程更强大,但也有一些注意事项。首先,您实际上正在运行一个完全不同的 Python 解释器。这意味着全局变量等将为您运行的每个进程获取一个新副本。

根据您的操作系统和启动方法的选择,您的进程可能是“分叉”或“衍生”。一个“spawned”进程将重新开始,就像一个新的Python程序刚刚启动一样。 forked进程将从源进程获取变量的所有当前值,但它仍然会复制所有这些变量。如果不使用 multiprocessing 帮助程序之一进行显式同步,未来对任一进程的更改都不会影响另一个进程。 您可以使用 Manager

 在进程之间显式同步数据。这有点像两个进程都连接到的本地服务器。对于更明确的发布-订阅数据,您还可以使用 

Queue

 将信息从一个进程传递到另一个进程。
感谢 
Silvio

Selcuk

0
投票

我在分叉之前存储了Process并在

stop

处杀死了它:


class JobScheduler:
    manager = None
    loop = None
    logger = _logger
    process_lock = Lock()
    JOBS = [ProductionStatusJob]

    @classmethod
    def start(cls) -> None:
        cls.logger.info("Starting Up (1/2)")
        cls.loop = Process(target=cls._loop)
        cls.loop.start()
    
    @classmethod
    def _loop(cls) -> None:
        loop = asyncio.get_event_loop()
        loop.create_task(cls._run())
        cls.logger.info("Startup Complete (2/2)")
        loop.run_forever()
        loop.close() # <= is probably never called
...

    @classmethod
    async def stop(cls) -> None:
        cls.logger.info("Shutting Down (1/2)")
        with cls.process_lock:
            cls.loop.kill()
        cls.logger.info("Shutdown Complete (2/2)")
        cls.loop = None

    

© www.soinside.com 2019 - 2024. All rights reserved.