Python 多线程 - 未终止的进程

问题描述 投票:0回答:1

下面的代码是我根据我正在处理的代码建模的,并为了解决这个问题而进行了简化,它似乎没有正确使用 p.join() 来管理 p.start( 创建的进程) )导致内存泄漏,运行该代码的K8S pod资源耗尽。我有明显遗漏的东西吗?

import multiprocessing
import random
import time

def process_C(semaphore):
    try:
        sleep_time = random.random() * 6
        print("Process C sleeping for {:.2f} seconds".format(sleep_time))
        if sleep_time > 4.2:
            raise Exception("Random exception")
        time.sleep(sleep_time)
        print("Process C completed")
        semaphore.release()
    except Exception as e:
        print("Process C failed: {}".format(e))
        semaphore.release()


def process_B(semaphore):
        p = multiprocessing.Process(target=process_C, args=(semaphore,))
        semaphore.acquire()
        p.start()
        p.join(timeout=0)  # Set timeout to 0 to prevent blocking            
        print("Process B completed")
        
def process_A(semaphore):
        process_B(semaphore)    
        print("Process A completed")

if __name__ == "__main__":
    semaphore = multiprocessing.Semaphore(20)
    while True:
        process_A(semaphore)

我尝试检查进程是否还活着并一一终止它们,但没有成功。

python multithreading multiprocessing python-multiprocessing python-multithreading
1个回答
0
投票

主要问题是

p.join(timeout=0)
不会等待该过程完成。

它立即返回并进入下一个循环迭代,导致僵尸进程并导致内存泄漏。 要检查进程是否处于活动状态并终止它们(如果是),您应该跟踪每个子进程。 您可以将每个子进程附加到一个列表中,检查每个子进程是否还活着,然后终止它们。

def process_B(semaphore, list_of_child_processes):
    p = multiprocessing.Process(target=process_C, args=(semaphore,))
    semaphore.acquire()
    p.start()
    list_of_child_processes.append(p)  # Keep track of child processes
    print("Process B completed")

def process_A(semaphore, list_of_child_processes):
    process_B(semaphore, list_of_child_processes)

def terminate_processes(list_of_child_processes):
    """
    Call this function to terminate a process if it's alive

    """
    
    for p in list_of_child_processes:
        if p.is_alive():
            print("Terminating process:", p.pid)
            p.terminate()

然后

if __name__ == "__main__":
    semaphore = multiprocessing.Semaphore(20)
    list_of_child_processes = []
    while True:
        process_A(semaphore, list_of_child_processes)

根据您在 Pod 上运行的应用程序,您可以决定如何调用

terminate_processes
函数。

© www.soinside.com 2019 - 2024. All rights reserved.