我在这个代码中遇到了可腌制性问题(也附在下面)。我已阅读相关帖子[1][2]但我找不到有用的对应信息。您能给出这个错误的解释或解决方案吗?
下面是返回错误的代码部分:
pickle.PicklingError: Can't pickle <class '__main__.JobQueueManager'>: it's not found as __main__.JobQueueManager
谢谢!
def make_server_manager(port, authkey):
job_q = Queue.Queue()
result_q = Queue.Queue()
class JobQueueManager(SyncManager):
pass
JobQueueManager.register('get_job_q', callable=lambda: job_q)
JobQueueManager.register('get_result_q', callable=lambda: result_q)
manager = JobQueueManager(address=('', port), authkey=authkey)
manager.start()
print 'Server started at port %s' % port
return manager
PS:Python 2.7.7,Win 7
据我所知,要使此模式在 Windows 上工作,您需要创建一个可picklable
queue.Queue
。您可以通过创建定义 Queue
和 __setstate__
的
__getstate__
的子类来实现这一点,并让它仅腌制我们实际需要在进程之间发送的状态片段,并保留其他内容(不可腌制)内部锁)退出。
我们需要进行的其他更改是将自定义
Manager
类定义移至顶层,并且不使用 lambda
函数作为 callable
的参数。相反,我们使用 partial
和顶级函数,因为它可以被腌制。这是最终的代码:
import sys
from multiprocessing.managers import SyncManager
from functools import partial
import multiprocessing
from Queue import Queue as _Queue
class Queue(_Queue):
""" A picklable queue. """
def __getstate__(self):
# Only pickle the state we care about
return (self.maxsize, self.queue, self.unfinished_tasks)
def __setstate__(self, state):
# Re-initialize the object, then overwrite the default state with
# our pickled state.
Queue.__init__(self)
self.maxsize = state[0]
self.queue = state[1]
self.unfinished_tasks = state[2]
def get_q(q):
return q
class JobQueueManager(SyncManager):
pass
def make_server_manager(port, authkey):
job_q = Queue()
result_q = Queue()
job_q.put("hey")
JobQueueManager.register('get_job_q', callable=partial(get_q, job_q))
JobQueueManager.register('get_result_q', callable=partial(get_q, result_q))
manager = JobQueueManager(address=('', port), authkey=authkey)
#manager.start()
print('Server started at port %s' % port)
return manager
def make_client_manager(port, authkey):
JobQueueManager.register('get_job_q')
JobQueueManager.register('get_result_q')
manager = JobQueueManager(address=('localhost', port), authkey=authkey)
manager.connect()
queue = manager.get_job_q()
print("got queue {}".format(queue))
print(queue.get_nowait())
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--client":
make_client_manager(50000, 'abcdefg')
else:
manager = make_server_manager(50000, "abcdefg")
server = manager.get_server()
server.serve_forever()
您需要有
Queue.Queue
可腌制,以及您的 lambda
函数和 JobQueueManager
。
要做到这一点,我想你可以很懒,你所需要做的就是获得
dill
包和import dill
。
我没有在 Windows 上进行测试,但它应该按如下方式工作。
dill
可在此处获取:https://github.com/uqfoundation。
>>> import dill
>>> import Queue
>>> from multiprocessing.managers import SyncManager
>>>
>>> def make_server_manager(port, authkey):
... job_q = Queue.Queue()
... result_q = Queue.Queue()
... class JobQueueManager(SyncManager):
... pass
... JobQueueManager.register('get_job_q', callable=lambda: job_q)
... JobQueueManager.register('get_result_q', callable=lambda: result_q)
... manager = JobQueueManager(address=('',port), authkey=authkey)
... manager.start()
... print "server started at port %s" % port
... return manager
...
>>> sm = make_server_manager(12345, 'foofoo')
server started at port 12345
multiprocessing
库为您提供了一个开箱即用的解决方案 - multiprocessing.Queue
,它应该可以在任何地方自动picklable,甚至在Windows上(并且可以追溯到2.7)。
尝试使
Queue.Queue
可腌制对我来说似乎是个坏主意。您不会获得可以从两个不同进程中使用的一个队列 - 您将在另一个进程中获得该队列的独立副本。
如果您想在另一个进程中获得队列当前状态的副本,则将队列中的所有元素提取为免费pickle的普通旧列表的工作量会少得多(如果所有元素都是可pickle的) ),将列表发送过来,然后在另一侧重新构建一个新的
Queue.Queue
。
另外,正如我想您现在已经发现的那样,您无法腌制本地 lambda - 这将如何工作?相反,创建一个对该命名空间全局的函数,并将该全局函数与所需的数据一起发送过来。
尝试:
class JobQueueManager(SyncManager):
pass
def make_server_manager(port, authkey):
job_q = Queue.Queue()
result_q = Queue.Queue()
JobQueueManager.register('get_job_q', callable=lambda: job_q)
JobQueueManager.register('get_result_q', callable=lambda: result_q)
manager = JobQueueManager(address=('', port), authkey=authkey)
manager.start()
print 'Server started at port %s' % port
return manager
将类的定义移动到 pickle 可以找到它的位置应该允许 pickle。 Pickle 将在
__main__
模块中查找该类,但使用您的代码,它找不到它,因为它位于函数内部。
但是,正如评论中所指出的,管理器不需要进行 pickle,因此必须有另一个对象将其拖入,例如在其全局变量中包含管理器的函数。
完整的工作示例在这里, 队列服务器
您将其导入为 _Queue
from Queue import Queue as _Queue
并将其称为 Queue()。
def make_server_manager(port, authkey):
job_q = Queue()
result_q = Queue()
job_q.put("hey")
我遇到了其他一些严重的选择性问题,但相比之下,我遇到了问题,我致力于将其切换到主线程和一个用于提供队列项目和客户端来订阅和完成工作的运行程序类。它需要一些整洁的工作,但工作顺利。
我花了大约 2 天的时间来计划和编写脚本,但在看到它的效果后是值得的。
from queue import Queue
if __name__ == '__main__':
try:
server_address = ('localhost', 8624)
get_queue = result_queue = Queue()
authkey = bytes("769ac424-adb6-5a73-83b0-d22eb27e543b", 'utf-8') #comment out this line for random apikey
'''w = Worker(queue) #QueueSupplyWorker(authkey, queue) #a worker object using the same queue
w.start()'''
threading.Thread(target=start_supplyrunner, args=[get_queue,result_queue]).start()
#register queues statically
DistibutedServer.register('get_queue', callable=lambda: get_queue)
DistibutedServer.register('result_queue', callable=lambda: result_queue)
ds = DistibutedServer(address=server_address, authkey=authkey)
s = ds.get_server()
#serve_forever must be the last since it enters the loop anything after this won't execute until keyboardinterrupt
s.serve_forever()
except KeyboardInterrupt:
print("winding up..")
time.sleep(2)
print('bye, bye!')
完整的工作示例在这里, 队列服务器