如何使 Python 线程程序(带锁)在多进程上运行?

问题描述 投票:0回答:3

我有一个多线程程序,我想让用户选择如何运行它们,串行、多线程或多核,至少在顶层。下面显示了一个可运行的演示,说明了我的程序的逻辑。

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from queue import Queue
# from multiprocessing import Queue


class Handler:
    def __init__(self):
        self.queue = Queue()  # this object is essential

    def put(self, item):
        self.queue.put(item)

    def run(self):
        while True:
            item = self.queue.get()
            # do other things on the item ...
            print(item)


class Runner:
    def __init__(self, name):
        self.name = name
        self.a = Handler()
        self.b = Handler()

    def start(self):
        self.a.put(f'{self.name}: hello a')
        self.b.put(f'{self.name}: hello b')
        with ThreadPoolExecutor() as exe:
            futures = [exe.submit(r.run) for r in [self.a, self.b]]
        for future in futures:
            future.result()


# current implementation
def run_in_multi_thread():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    with ThreadPoolExecutor() as exe:
        futures = [exe.submit(r.start) for r in [rA, rB, rC]]
        for future in futures:
            future.result()


# how to implement this?
def run_in_multi_process():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    with ProcessPoolExecutor() as exe:
        futures = [exe.submit(r.start) for r in [rA, rB, rC]]
        for future in futures:
            future.result()


if __name__ == '__main__':
    # run_in_multi_thread()  # this is currently running fine
    run_in_multi_process()  # how to make this work as well?

我的目标很简单,我想把很多

Runner
放到不同的进程中真正并行运行。

问题是,当我尝试将最外层的

ThreadPoolExecutor
更改为
ProcessPoolExecutor
时,python 总是引发
TypeError: cannot pickle '_thread.lock' object
.

谷歌搜索后,我知道这是因为我在我所有的处理程序中都使用了

queue.Queue
,它使用了
threading.Lock
,这是一个不可腌制的类。但是,我无法避免使用它们,因为核心功能都由
queue.Queue
threading.Event
支持,供所有Handlers进行通信。

我也试过用

queue.Queue
代替
multiprocessing.Queue
,但是这次它提高了
RuntimeError: Queue objects should only be shared between processes through inheritance
。我也听说过第三方库,例如
dill
pathos
,但它会导致其他酸洗问题,所以我最终坚持使用内置库。

欢迎任何关于如何重构我的代码的建议。

python multithreading multiprocessing queue locking
3个回答
1
投票

我自己也遇到过同样的问题。标准答案是:

def init_pool_processes(q):
    global queue
    queue = q

...

def main(tasks):
    queue = Queue()
    with Pool(initializer=init_pool_processes, initargs=(queue,)) as pool:
        ...

这将允许所有线程共享同一个队列。

请注意,如所写,您的代码将永远不会退出。你的

Handler.run
方法永远不会退出,所以你的未来永远不会有价值。你需要某种方式让你的跑步者知道没有更多的工作要跑,而且永远不会有更多的工作添加。


0
投票

同步原语确实需要在多处理上下文中通过继承传递。

mpire
库使用
shared_objects
功能支持此功能。见docs.

from multiprocessing import Queue
from mpire import WorkerPool

class Handler: ...
class Runner: ...

def task(runners, idx):
    runners[idx].start()

if __name__ == '__main__':
    runners = [Runner('A'), Runner('B'), Runner('C')]
    with WorkerPool(n_jobs=3, shared_objects=runners, 
                    start_method='fork') as pool:
        pool.map(task, range(len(runners)), chunk_size=1)

共享对象由mpire继承传递,解决你的问题。如果您需要通过多个进程进行通信,则必须使用

multiprocessing.Queue

您可以将

start_method
更改为
'threading'
,它将使用线程而不是默认的
'fork'
(在unix上)或
'spawn'
在Windows上。

请注意,

mpire
将很快发布对
apply
/
apply_async
的支持,您可以将其更改为:

runners = [Runner('A'), Runner('B'), Runner('C')]
    with WorkerPool(n_jobs=3, shared_objects=runners, 
                    start_method='fork') as pool:
        futures = [pool.apply_async(task, idx) 
                   for idx in range(len(runners))]
        for future in futures:
            future.get() 

然而,Frank Yellin 说的是真的。你的

run
方法永远不会退出,所以你可能想看看那部分。


0
投票

经过反复试验,我终于想出了一个可行的解决方案。我想我也可以回答我自己的问题。我已经放弃使用

ProcessPoolExecutor
了。现在我直接使用
multiprocessing.Process
。下面是修改后的演示代码,描述了它如何在我的用例中工作。

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from multiprocessing import Process
from queue import Queue
from time import sleep


class Handler:
    def __init__(self):
        self.queue = Queue()  # this object is essential

    def put(self, item):
        self.queue.put(item)

    def run(self):
        while True:
            item = self.queue.get()
            if item == 'exit':
                break
            # do other things on the item ...
            print(item)
            sleep(1)


class Runner:
    def __init__(self, name):
        self.name = name
        self.a = Handler()
        self.b = Handler()

    def start(self):
        # some dummy messages
        for _ in range(3):
            self.a.put(f'{self.name}: hello a')
            self.b.put(f'{self.name}: hello b')
        # request to shutdown gracefully
        self.a.put('exit')
        self.b.put('exit')
        with ThreadPoolExecutor() as exe:
            futures = [exe.submit(r.run) for r in [self.a, self.b]]
            for f in futures:
                f.result()


# this requires everything to be picklable
def run_in_process_pool():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    with ProcessPoolExecutor() as exe:
        futures = [exe.submit(r.start) for r in [rA, rB, rC]]
        for future in futures:
            future.result()


# this does not pickle anything, but why?
def run_in_processes():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    procs = [Process(target=r.start) for r in [rA, rB, rC]]
    for p in procs:
        p.start()
    for p in procs:
        p.join()


if __name__ == '__main__':
    # run_in_process_pool()  # `TypeError: cannot pickle '_thread.lock' object`
    run_in_processes()  # this is working

令人惊讶的是,这段代码不会再抱怨类不可 pickleable 了!最后我只是使用旧的时尚方式(即创建流程 -> 开始 -> 加入)。总运行时间从 4:39 减少到 1:05(我的 cpu 有 4 个内核,真正的并行运行时间减少到 1/4 左右是非常合理的)。下次如果你有一些多线程代码,在后台使用

queue.Queue
threading.Lock
,你可以考虑直接使用
multiprocessing.Process
包装它们。

但是我仍然想知道为什么只有

ProcessPoolExecutor
要求所有东西都可以腌制而Process 不需要。据我所知,Process会调用系统调用来创建一个新进程,具体实现依赖于os.因为我在运行 WSL2 的 Windows 机器上,所以我想我在 Unix 上,默认启动方法应该是“fork”,它只是克隆整个过程而不需要 pickle 任何东西?

© www.soinside.com 2019 - 2024. All rights reserved.