TypeError:无法pickle _thread.lock对象

问题描述 投票:0回答:6

尝试使用共享队列同时运行两个不同的函数并收到错误...如何使用共享队列同时运行两个函数?这是 Windows 7 上的 Python 版本 3.6。

from multiprocessing import Process
from queue import Queue
import logging

def main():
    x = DataGenerator()
    try:
        x.run()
    except Exception as e:
        logging.exception("message")


class DataGenerator:

    def __init__(self):
        logging.basicConfig(filename='testing.log', level=logging.INFO)

    def run(self):
        logging.info("Running Generator")
        queue = Queue()
        Process(target=self.package, args=(queue,)).start()
        logging.info("Process started to generate data")
        Process(target=self.send, args=(queue,)).start()
        logging.info("Process started to send data.")

    def package(self, queue): 
        while True:
            for i in range(16):
                datagram = bytearray()
                datagram.append(i)
                queue.put(datagram)

    def send(self, queue):
        byte_array = bytearray()
        while True:
            size_of__queue = queue.qsize()
            logging.info(" queue size %s", size_of_queue)
            if size_of_queue > 7:
                for i in range(1, 8):
                    packet = queue.get()
                    byte_array.append(packet)
                logging.info("Sending datagram ")
                print(str(datagram))
                byte_array(0)

if __name__ == "__main__":
    main()

日志指示错误,我尝试以管理员身份运行控制台,但收到相同的消息...

INFO:root:Running Generator
ERROR:root:message
Traceback (most recent call last):
  File "test.py", line 8, in main
    x.run()
  File "test.py", line 20, in run
    Process(target=self.package, args=(queue,)).start()
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Miniconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects
python python-3.x
6个回答
51
投票

我在 Python 3.6.3 中遇到了同样的问题。


收到错误:

Pool()


假设我们想要并行添加一些数字

TypeError: can't pickle _thread.RLock objects

到某个列表

num_to_add
的每个元素。代码大致如下:

num_list

这里的问题是函数 
class DataGenerator: def __init__(self, num_list, num_to_add) self.num_list = num_list # e.g. [4,2,5,7] self.num_to_add = num_to_add # e.g. 1 self.run() def run(self): new_num_list = Manager().list() pool = Pool(processes=50) results = [pool.apply_async(run_parallel, (num, new_num_list)) for num in num_list] roots = [r.get() for r in results] pool.close() pool.terminate() pool.join() def run_parallel(self, num, shared_new_num_list): new_num = num + self.num_to_add # uses class parameter shared_new_num_list.append(new_num)

中的

self
不能被 pickle,因为它是一个类实例。将这个并行函数
run_parallel()
移出类很有帮助。但这不是最好的解决方案,因为这个函数可能需要使用像
run_parallel()
这样的类参数,然后你必须将它作为参数传递。

解决方案:

self.num_to_add

上面的其他建议对我没有帮助。


35
投票
def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument new_num = num + to_add shared_new_num_list.append(new_num) class DataGenerator: def __init__(self, num_list, num_to_add) self.num_list = num_list # e.g. [4,2,5,7] self.num_to_add = num_to_add # e.g. 1 self.run() def run(self): new_num_list = Manager().list() pool = Pool(processes=50) results = [pool.apply_async(run_parallel, (num, new_num_list, self.num_to_add)) # num_to_add is passed as an argument for num in num_list] roots = [r.get() for r in results] pool.close() pool.terminate() pool.join()

更改为

from queue import Queue
根本原因是前者Queue是为线程模块Queue设计的,而后者是为multiprocessing.Process模块设计的。


11
投票
multiprocessing.Pool - PicklingError:无法pickle

:属性查找thread.lock失败 将队列移至自身,而不是作为函数的参数

from multiprocessing import Queue

package
    


3
投票

此问题可能由多种原因引起。以下是我遇到的两种情况:

封装不兼容
    • 问题:
    • 例如,当最终被调用的代码的另一部分也需要创建新进程或由于使用锁而与复制到新进程不兼容时,尝试使用多处理。
    • 解决方案:
    • 我的问题是尝试对所有进程使用与 MongoDB 实例的单个连接。为每个进程创建一个新连接解决了该问题。
    类实例
    • 问题:
    • 尝试从类内部调用send到类中的另一个函数。使其成为静态方法或在外部调用函数时不起作用并给出相同的错误。类实例无法被 pickle,因此我们需要在开始多重处理后创建实例。
    • 解决方案:
    • 我最终所做的对我有用的事情是将我的班级分成两个班级。基本上,您调用多处理的函数需要在为其所属的类实例化一个新对象后立即调用。像这样的东西:
  1. pool.starmap


1
投票

from multiprocessing import Pool class B: ... def process_feature(idx, feature): # do stuff in the new process pass ... def multiprocess_feature(process_args): b_instance = B() return b_instance.process_feature(*process_args) class A: ... def process_stuff(): ... with Pool(processes=num_processes, maxtasksperchild=10) as pool: results = pool.starmap( multiprocess_feature, [ (idx, feature) for idx, feature in enumerate(features) ], chunksize=100, ) ... ... ...



0
投票

修复是一个非常简单的两行辅助函数。一点也不差。我将其简化以显示相关部分:

之前

fakeSelf = None def run_parallel(num, shared_new_num_list, to_add): # to_add is passed as an argument new_num = num + fakeSelf.num_to_add shared_new_num_list.append(new_num) class DataGenerator: def __init__(self, num_list, num_to_add) globals()['fakeSelf'] = self self.num_list = num_list # e.g. [4,2,5,7] self.num_to_add = num_to_add # e.g. 1 self.run() def run(self): new_num_list = Manager().list()

产生了

Pool

之后

class MyClass:
    def create_if_not_exists(self, a, b, c) -> AsyncResult:
        # ...
        return pool.apply_async(self.create, (a, b, c))

© www.soinside.com 2019 - 2024. All rights reserved.