Python:模块中的multiprocessing Queue.put()不会向父进程发送任何内容

问题描述 投票:0回答:1

我正在尝试使用Python中的多处理程序包,更确切地说是Queue()类,使2个进程之间相互通信。我想从父进程中每5秒获取一个子进程的更新值。该子进程是一个类函数。我做了一个玩具示例,说明一切正常。

但是,当我尝试在我的项目中实现此解决方案时,子模块中子进程的Queue.put()方法似乎不会向父进程发送任何内容,因为父进程不会t打印所需的值,代码永不停止运行。实际上,父进程仅打印发送给子进程的值,此处为True,但是正如我所说的,它永远不会停止。

所以我的问题是:

1)我的玩具示例是否有任何错误?

2)我应该如何修改我的项目以使其像我的玩具示例一样正常工作?

考虑到它阻碍了我项目的最后阶段,任何帮助将不胜感激。

玩具示例:作品

主模块

from multiprocessing import Process, Event, Lock, Queue, Pipe
import time 
import test_mod as test

def loop(output):
    stop_event = Event()
    q = Queue()
    child_process = Process(target=test.child.sub, args=(q,))
    child_process.start()
    i = 0
    print("started at {} ".format(time.time()))

    while not stop_event.is_set():
        i+=1
        time.sleep(5)
        q.put(True)
        print(q.get())
        if i == 5:
            child_process.terminate()
            stop_event.set()

    output.put("main process looped")

if __name__ == '__main__':
    stop_event, output = Event(), Queue()
    k = 0
    while k < 5:
        loop_process = Process(target=loop, args=(output,))
        loop_process.start()
        print(output.get())
        loop_process.join()
        k+=1

子模块

from multiprocessing import Process, Event, Lock, Queue, Pipe
import time


class child(object):
    def __init__(self):
        pass

    def sub(q):
        i = 0
        while i < 2000:
            latest_value = time.time()
            accord = q.get()
            if accord == True:
                q.put(latest_value)
            accord = False
            time.sleep(0.0000000005)
            i+=1

项目代码:无效

主模块

import neat #package in which the submodule is 
import *some other stuff*

def run(config_file):

    config = neat.Config(some configuration)

    p = neat.Population(config)

    **WHERE MY PROBLEM IS**

    stop_event = Event()
    q = Queue()
    pe = neat.ParallelEvaluator(**args)

    child_process = Process(target=p.run, args=(pe.evaluate, q, other args))
    child_process.start()

    i = 0
    while not stop_event.is_set():

        q.put(True)
        print(q.get())
        time.sleep(5)
        i += 1
        if i == 5:
            child_process.terminate()
            stop_event.set()

if __name__ == '__main__':
    run(config_file)

子模块

class Population(object):
    def __init__():
      *initialization*

    def run(self, q, other args):

        while n is None or k < n:
            *some stuff*
            accord = add_2.get()
            if accord == True:
                add_2.put(self.best_genome.fitness)
            accord = False

        return self.best_genome

注意:1)我不习惯多处理

2)鉴于整个代码太长,我试图给出项目中最相关的部分,但是如果不清楚,请告诉我,我将更新我的问题。

3)我也考虑过使用Pipe(),但是此选项也不起作用。

python module multiprocessing queue communication
1个回答
1
投票

如果我没看错,则所需的子模块为类Population。但是,您使用类型为ParallelEvaluator的参数启动过程。接下来,我看不到您向子流程提供了队列q。这就是我从提供的代码中看到的内容:

stop_event = Event()
q = Queue()
pe = neat.ParallelEvaluator(**args)

child_process = Process(target=p.run, args=(pe.evaluate, **args)
child_process.start()

此外,以下几行创建了竞争条件:

q.put(True)
print(q.get())

get命令就像pop。因此,它需要一个元素并将其从队列中删除。如果您的子进程无法访问这两行之间的队列(因为它很忙),则True将永远不会进入子进程。因此,最好两个使用多个队列。每个方向一个。类似于:

stop_event = Event()
q_in = Queue()
q_out = Queue()
pe = neat.ParallelEvaluator(**args)

child_process = Process(target=p.run, args=(pe.evaluate, **args))
child_process.start()

i = 0
while not stop_event.is_set():

     q_in.put(True)
     print(q_out.get())
     time.sleep(5)
     i += 1
     if i == 5:
         child_process.terminate()
         stop_event.set()

这是您的子模块

class Population(object):
    def __init__():
      *initialization*

    def run(self, **args):

        while n is None or k < n:
            *some stuff*
            accord = add_2.get()           # add_2 = q_in
            if accord == True:
                add_3.put(self.best_genome.fitness)  #add_3 = q_out
            accord = False

        return self.best_genome
© www.soinside.com 2019 - 2024. All rights reserved.