我有一个类(MyClass),它包含一个需要运行的动作队列(self.msg_queue),我有多个输入源,可以向队列中添加任务。
现在我有三个函数,我想同时运行。
流程描述:在类加入网络后,我让它产生三个线程(上面的每个函数一个)。 每个线程函数从队列中添加项目,语法为 "self.msg_queue.put(message)",从队列中删除项目,语法为 "self.msg_queue.get_nowait()"。
问题描述:我遇到的问题是,似乎每个线程都在修改自己的队列对象(它们并没有共享队列,msg_queue,它们,函数,都是这个类的成员)。
我对多处理不够熟悉,不知道重要的错误信息是什么;但是,它在说明它不能拾取一个弱ref对象(它没有给出哪个对象是弱ref对象的说明),并且在queue.put()调用内有一行 "self._sem.acquire(block, timeout)产生一个'[WinError 5] Access is denied'"错误。是否可以认为这个故障在队列的引用没有正确复制过来?
我使用的是Python 3.7.2和多处理包的Process和Queue。
[我已经看到了多个关于让线程在类之间穿梭信息的QA--创建一个主线束,生成一个队列,然后把这个队列作为参数传递给每个线程。 如果函数不需要使用MyClass中的其他函数,我可以看到通过让这些函数接收队列并使用本地变量而不是类变量来调整这个策略。]
[我很有信心,这个错误不是把我的队列传给tkinter对象的结果,因为我的单元测试是关于我的GUI如何修改其调用者的队列,工作正常]
下面是一个队列出错的最小可重现的例子。
from multiprocessing import Queue
from multiprocessing import Process
import queue
import time
class MyTest:
def __init__(self):
self.my_q = Queue()
self.counter = 0
def input_function_A(self):
while True:
self.my_q.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
def input_function_B(self):
while True:
self.counter = 0
self.my_q.put(self.counter)
time.sleep(1)
def output_function(self):
while True:
try:
var = self.my_q.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)
def run(self):
process_A = Process(target=self.input_function_A)
process_B = Process(target=self.input_function_B)
process_C = Process(target=self.output_function)
process_A.start()
process_B.start()
process_C.start()
# without this it generates the WinError:
# with this it still behaves as if the two input functions do not modify the queue
process_C.join()
if __name__ == '__main__':
test = MyTest()
test.run()
事实上--这些不是 "线程"--这些是 "进程"--而如果你使用的是多线程,而不是多进程,那么你的 self.my_q
实例将是 同一对象,放置在计算机的同一内存空间,多处理做了一个 叉子 的进程,而原进程("运行 "调用中正在执行的进程)中的任何数据在使用时都会被复制--所以,每个子进程都会看到自己的 "队列 "实例,与其他进程无关。
让各种进程共享一个多进程.Queue对象的正确方法是把它作为参数传递给目标方法。更简单的方法是这样重组你的代码,使其工作。
from multiprocessing import Queue
from multiprocessing import Process
import queue
import time
class MyTest:
def __init__(self):
self.my_q = Queue()
self.counter = 0
def input_function_A(self, queue):
while True:
queue.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
def input_function_B(self, queue):
while True:
self.counter = 0
queue.put(self.counter)
time.sleep(1)
def output_function(self, queue):
while True:
try:
var = queue.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)
def run(self):
process_A = Process(target=self.input_function_A, args=(queue,))
process_B = Process(target=self.input_function_B, args=(queue,))
process_C = Process(target=self.output_function, args=(queue,))
process_A.start()
process_B.start()
process_C.start()
# without this it generates the WinError:
# with this it still behaves as if the two input functions do not modify the queue
process_C.join()
if __name__ == '__main__':
test = MyTest()
test.run()
正如你所看到的,由于你的类实际上并没有通过实例的属性来共享任何数据,这种 "类 "的设计对于你的应用程序来说并没有什么意义--但对于在同一个代码块中对不同的工作者进行分组。
可以有一个神奇的多进程类,它有一些内部方法来实际启动worker-methods并共享Queue实例--所以如果你在一个项目中有很多这样的类,就会少很多模板。
类似的东西。
from multiprocessing import Queue
from multiprocessing import Process
import time
class MPWorkerBase:
def __init__(self, *args, **kw):
self.queue = None
self.is_parent_process = False
self.is_child_process = False
self.processes = []
# ensure this can be used as a colaborative mixin
super().__init__(*args, **kw)
def run(self):
if self.is_parent_process or self.is_child_process:
# workers already initialized
return
self.queue = Queue()
processes = []
cls = self.__class__
for name in dir(cls):
method = getattr(cls, name)
if callable(method) and getattr(method, "_MP_worker", False):
process = Process(target=self._start_worker, args=(self.queue, name))
self.processes.append(process)
process.start()
# Setting these attributes here ensure the child processes have the initial values for them.
self.is_parent_process = True
self.processes = processes
def _start_worker(self, queue, method_name):
# this method is called in a new spawned process - attribute
# changes here no longer reflect attributes on the
# object in the initial process
# overwrite queue in this process with the queue object sent over the wire:
self.queue = queue
self.is_child_process = True
# call the worker method
getattr(self, method_name)()
def __del__(self):
for process in self.processes:
process.join()
def worker(func):
"""decorator to mark a method as a worker that should
run in its own subprocess
"""
func._MP_worker = True
return func
class MyTest(MPWorkerBase):
def __init__(self):
super().__init__()
self.counter = 0
@worker
def input_function_A(self):
while True:
self.queue.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
@worker
def input_function_B(self):
while True:
self.counter = 0
self.queue.put(self.counter)
time.sleep(1)
@worker
def output_function(self):
while True:
try:
var = self.queue.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)
if __name__ == '__main__':
test = MyTest()
test.run()