我有一个多线程程序,我想让用户选择如何运行它们,串行、多线程或多核,至少在顶层。下面显示了一个可运行的演示,说明了我的程序的逻辑。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from queue import Queue
# from multiprocessing import Queue
class Handler:
def __init__(self):
self.queue = Queue() # this object is essential
def put(self, item):
self.queue.put(item)
def run(self):
while True:
item = self.queue.get()
# do other things on the item ...
print(item)
class Runner:
def __init__(self, name):
self.name = name
self.a = Handler()
self.b = Handler()
def start(self):
self.a.put(f'{self.name}: hello a')
self.b.put(f'{self.name}: hello b')
with ThreadPoolExecutor() as exe:
futures = [exe.submit(r.run) for r in [self.a, self.b]]
for future in futures:
future.result()
# current implementation
def run_in_multi_thread():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
with ThreadPoolExecutor() as exe:
futures = [exe.submit(r.start) for r in [rA, rB, rC]]
for future in futures:
future.result()
# how to implement this?
def run_in_multi_process():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
with ProcessPoolExecutor() as exe:
futures = [exe.submit(r.start) for r in [rA, rB, rC]]
for future in futures:
future.result()
if __name__ == '__main__':
# run_in_multi_thread() # this is currently running fine
run_in_multi_process() # how to make this work as well?
我的目标很简单,我想把很多
Runner
放到不同的进程中真正并行运行。
问题是,当我尝试将最外层的
ThreadPoolExecutor
更改为 ProcessPoolExecutor
时,python 总是引发 TypeError: cannot pickle '_thread.lock' object
.
谷歌搜索后,我知道这是因为我在我所有的处理程序中都使用了
queue.Queue
,它使用了threading.Lock
,这是一个不可腌制的类。但是,我无法避免使用它们,因为核心功能都由queue.Queue
和threading.Event
支持,供所有Handlers进行通信。
我也试过用
queue.Queue
代替multiprocessing.Queue
,但是这次它提高了RuntimeError: Queue objects should only be shared between processes through inheritance
。我也听说过第三方库,例如dill
或pathos
,但它会导致其他酸洗问题,所以我最终坚持使用内置库。
欢迎任何关于如何重构我的代码的建议。
我自己也遇到过同样的问题。标准答案是:
def init_pool_processes(q):
global queue
queue = q
...
def main(tasks):
queue = Queue()
with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
...
这将允许所有线程共享同一个队列。
请注意,如所写,您的代码将永远不会退出。你的
Handler.run
方法永远不会退出,所以你的未来永远不会有价值。你需要某种方式让你的跑步者知道没有更多的工作要跑,而且永远不会有更多的工作添加。
同步原语确实需要在多处理上下文中通过继承传递。
mpire
库使用 shared_objects
功能支持此功能。见docs.
from multiprocessing import Queue
from mpire import WorkerPool
class Handler: ...
class Runner: ...
def task(runners, idx):
runners[idx].start()
if __name__ == '__main__':
runners = [Runner('A'), Runner('B'), Runner('C')]
with WorkerPool(n_jobs=3, shared_objects=runners,
start_method='fork') as pool:
pool.map(task, range(len(runners)), chunk_size=1)
共享对象由mpire继承传递,解决你的问题。如果您需要通过多个进程进行通信,则必须使用
multiprocessing.Queue
。
您可以将
start_method
更改为'threading'
,它将使用线程而不是默认的'fork'
(在unix上)或'spawn'
在Windows上。
请注意,
mpire
将很快发布对 apply
/apply_async
的支持,您可以将其更改为:
runners = [Runner('A'), Runner('B'), Runner('C')]
with WorkerPool(n_jobs=3, shared_objects=runners,
start_method='fork') as pool:
futures = [pool.apply_async(task, idx)
for idx in range(len(runners))]
for future in futures:
future.get()
然而,Frank Yellin 说的是真的。你的
run
方法永远不会退出,所以你可能想看看那部分。
经过反复试验,我终于想出了一个可行的解决方案。我想我也可以回答我自己的问题。我已经放弃使用
ProcessPoolExecutor
了。现在我直接使用multiprocessing.Process
。下面是修改后的演示代码,描述了它如何在我的用例中工作。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Process
from queue import Queue
from time import sleep
class Handler:
def __init__(self):
self.queue = Queue() # this object is essential
def put(self, item):
self.queue.put(item)
def run(self):
while True:
item = self.queue.get()
if item == 'exit':
break
# do other things on the item ...
print(item)
sleep(1)
class Runner:
def __init__(self, name):
self.name = name
self.a = Handler()
self.b = Handler()
def start(self):
# some dummy messages
for _ in range(3):
self.a.put(f'{self.name}: hello a')
self.b.put(f'{self.name}: hello b')
# request to shutdown gracefully
self.a.put('exit')
self.b.put('exit')
with ThreadPoolExecutor() as exe:
futures = [exe.submit(r.run) for r in [self.a, self.b]]
for f in futures:
f.result()
# this requires everything to be picklable
def run_in_process_pool():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
with ProcessPoolExecutor() as exe:
futures = [exe.submit(r.start) for r in [rA, rB, rC]]
for future in futures:
future.result()
# this does not pickle anything, but why?
def run_in_processes():
rA = Runner('A')
rB = Runner('B')
rC = Runner('C')
procs = [Process(target=r.start) for r in [rA, rB, rC]]
for p in procs:
p.start()
for p in procs:
p.join()
if __name__ == '__main__':
# run_in_process_pool() # `TypeError: cannot pickle '_thread.lock' object`
run_in_processes() # this is working
令人惊讶的是,这段代码不会再抱怨类不可 pickleable 了!最后我只是使用旧的时尚方式(即创建流程 -> 开始 -> 加入)。总运行时间从 4:39 减少到 1:05(我的 cpu 有 4 个内核,真正的并行运行时间减少到 1/4 左右是非常合理的)。下次如果你有一些多线程代码,在后台使用
queue.Queue
或 threading.Lock
,你可以考虑直接使用 multiprocessing.Process
包装它们。
但是我仍然想知道为什么只有
ProcessPoolExecutor
要求所有东西都可以腌制而Process 不需要。据我所知,Process会调用系统调用来创建一个新进程,具体实现依赖于os.因为我在运行 WSL2 的 Windows 机器上,所以我想我在 Unix 上,默认启动方法应该是“fork”,它只是克隆整个过程而不需要 pickle 任何东西?