多线程Python项目中的内存泄漏(?)

问题描述 投票:0回答:1

我有一个小项目(请记住我只是一个Python初学者)。 该项目由几个较小的 .py 文件组成。首先是

main.py
,看起来像这样:

from Controller import Controller
import config as cfg

if __name__ == "__main__":
    for path in cfg.paths.values():
        if not os.path.exists(path):
            os.system(f"mkdir {path} -p")
    Con = Controller()
    Con.start()

所以这个程序只是创建一些目录,创建控制器对象并运行它的方法。

Controller.py
看起来像这样:

import multiprocessing
import watchdog
import watchdog.events
import watchdog.observers
import watchdog.utils.dirsnapshot
import concurrent.futures
from AM import AM
import config as cfg

m = multiprocessing.Manager()
q = m.Queue()

class Handler(watchdog.events.PatternMatchingEventHandler):
    def __init__(self):
        # Set the patterns for PatternMatchingEventHandler
        watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'],
                                                            ignore_directories=True, case_sensitive=False)

    def on_created(self, event):
        logging.info("AM Watchdog received created event - % s." % event.src_path)
        q.put(event.src_path)

    def on_moved(self, event):
        logging.info("AM Watchdog received modified event - % s." % event.src_path)
        q.put(event.src_path)

class Controller:
    def __init__(self):
        pass

    def _start_func(self, newFname):
        try:
            res = AM(f"{newFname}").start()
            return res
        except:
            return 1

    def start(self):
        event_handler = Handler()
        observer = watchdog.observers.Observer()
        observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True)
        observer.start()
        
        try:
            while True:
                time.sleep(1)
                with concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers) as executor:
                    futures = {}
                    while not q.empty():
                        newFname = q.get()
                        futures_to_work = executor.submit(self._start_func, newFname)
                        futures[futures_to_work] = newFname

                    for future in concurrent.futures.as_completed(futures):
                        name = futures.pop(future)
                        print(f"{name} completed")
                    
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

这个程序比上一个程序更复杂(而且可能有一些问题)。它的目的是观察一个目录(

cfg.paths["ipath"]
)并等待
TEST*
文件出现。当它完成时,它的名称将被添加到队列中。当队列不为空时,会创建来自
concurrent.futures.ThreadPoolExecutor
的新 future,名称将传递给
_start_func
方法。此方法从 AM.py 创建一个新对象并运行它。其背后的思考过程是,我想要一个程序等待
TEST*
文件出现,然后对其进行一些操作,同时能够同时处理多个文件并按照它们出现的顺序对其进行处理。
AM.py
看起来像这样:

import subprocess

class AM():
    def __init__(self, fname):
        pass
   
    def test_func(self, fname):
        c = f"some_faulty_unix_program {fname}".split(" ")
        p = subprocess.run(c, capture_output=True, text = True)

        out, err = p.stdout, p.stderr
        if out:
            print(out)
        if err:
            print(err)
            return 1
        return 0

    def start(self, fname):
        res = self.test_func(fname)
        return res

该程序正在新进程中运行一些unix程序(在

Controller.py
中检测到的文件上)。该程序经常会产生错误(由于
TEST*
文件并不总是有效)。我认为这个程序是什么并不重要,但以防万一这个程序是来自
solve-field
astrometry.net
TEST*
文件是天空的图像。

整个项目作为服务运行,如下所示:

[Unit]
Description = test astrometry service
After = network.target

[Service]
Type = simple
ExecStart = /bin/bash -c "/home/project/main.py"
Restart = always
RestartSec = 2
TimeoutStartSec = infinity
User = root
Group = users
PrivateTmp = true
Environment = "PATH=/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/home/project"
[Install]
WantedBy = multi-user.target

当我启用此服务并使用

systemctl status my_project.service
检查它时,大约需要76.0M内存。我倾向于让这个工作整个晚上(我有一个每分钟拍摄一张夜空照片的系统,这个项目是为了计算这张夜空照片的天体测量)。第二天早上,当我使用
systemctl status
进行测试时,如果没有错误,内存约为 200-300M;如果出现问题,内存约为 3.5G(例如,我移动了此 UNIX 程序使用的配置文件,因此它会产生错误)在开始时)。为什么记忆力会这样增加?是我的代码有问题导致的,还是这个unix程序有问题?

python multithreading memory subprocess concurrent.futures
1个回答
0
投票

我不清楚内存泄漏发生在哪里。如果它在

AM.test_func
中运行的“some_faulty_unix_program”中,那么您需要找到或创建它的替代品。但我相信可以对代码进行一些简化/优化,以减少在其他地方发生内存泄漏的可能性。

首先,我认为您不需要一遍又一遍地重新创建多线程池。似乎

watchdog
使用多线程,因此您可以 使用更高效的 queue.Queue
 实例而不是托管队列。但最终我认为,通过对 
Controller.py 代码进行一些重构,以便您的处理程序将任务提交到多线程池,您可以完全消除显式队列。下面的工作可以吗?

import concurrent.futures from threading import Event import watchdog import watchdog.events import watchdog.observers import watchdog.utils.dirsnapshot from AM import AM import config as cfg class Handler(watchdog.events.PatternMatchingEventHandler): def __init__(self): # Create multithreading pool just once: self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=cfg.workers) # Set the patterns for PatternMatchingEventHandler watchdog.events.PatternMatchingEventHandler.__init__(self, patterns=['TEST*'], ignore_directories=True, case_sensitive=False) def on_created(self, event): logging.info("AM Watchdog received created event - %s.", event.src_path) self._run_start_func(event.src_path) def on_moved(self, event): logging.info("AM Watchdog received modified event - %s.", event.src_path) self._run_start_func(event.src_path) def _run_start_func(self, newFname): future = self._executor.submit(self._start_func, newFname) future.result() # Wait for completion print(f"{newFname} completed") def _start_func(self, newFname): try: res = AM(newFname).start() return res except: return 1 class Controller: def __init__(self): pass def start(self): event_handler = Handler() observer = watchdog.observers.Observer() observer.schedule(event_handler, path=cfg.paths["ipath"], recursive=True) observer.start() event = Event() try: # Block until keyboard interrupt: event.wait() except KeyboardInterrupt: observer.stop() observer.join()
    
© www.soinside.com 2019 - 2024. All rights reserved.