为什么我在使用多处理时收到一个递归错误?

问题描述 投票:2回答:1

我希望使用多进程对一个大型地址列表进行地理编码。我有以下代码。

import multiprocessing
import geocoder

addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on

def geocode_worker(address):
    return geocoder.arcgis(address)

def main_process():
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    return pool.map(geocode_worker, addresses)

if __name__ == '__main__':
    main_process()

但它给了我这个错误。

Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/opt/anaconda3/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 470, in _handle_results
    task = get()
  File "/opt/anaconda3/lib/python3.7/multiprocessing/connection.py", line 251, in recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 599, in __getattr__
    if not self.ok:
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 536, in ok
    return len(self) > 0
  File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 422, in __len__
    return len(self._list)

最后三行错误不断重复 然后最后一行回溯是:

RecursionError: maximum recursion depth exceeded while calling a Python object

谁能帮我找出原因?

python multiprocessing geocode
1个回答
4
投票

问题是 ArcgisQuery 返回的对象 geocoder 是不可picklable的--或者说,它是不可unpicklable的。unpickle过程由于使用了 __getattr__,它在内部试图访问 self.ok,最终依靠的是 self._list 要定义的,而在unpickling时没有定义,因为它只在 __init____init__ 解除腌制时不调用. 由于它没有被定义,所以它试图使用 __getattr__ 找到它,它试图访问 self.ok 再次,并创建无限循环。

你可以通过不传递 ArcgisQuery 对象本身,而只在worker进程和主进程之间传递其底层的 __dict__. 然后,重建 ArcgisQuery 对象的主流程。

import multiprocessing
import geocoder
from geocoder.arcgis import ArcgisQuery

addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on

def geocode_worker(address):
    out = geocoder.arcgis(address)
    return out.__dict__ # Only return the object's __dict__

def main_process():
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    l = pool.map(geocode_worker, addresses)
    out = []
    for d in l:
        q = ArcgisQuery(d['location'])  # location is a required constructor arg
        q.__dict__.update(d)  # Load the rest of our state into the new object
        out.append(q)
    return out

if __name__ == '__main__':
    print(main_process())

如果你实际上不需要整个 ArcgisQuery 对象,并且只需要其中的一些部分,你也可以直接从worker进程中返回这些部分,以避免这个黑客的需要。

值得一提的是,它看起来就像是 geocoder 可以通过实现 __getstate____setstate__ 在ArcgisQuery或其基类上,像这样。

    def __getstate__(self):
        return self.__dict__

    def __setstate__(self, state):
        self.__dict__.update(state)
© www.soinside.com 2019 - 2024. All rights reserved.