我希望使用多进程对一个大型地址列表进行地理编码。我有以下代码。
import multiprocessing
import geocoder
addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on
def geocode_worker(address):
return geocoder.arcgis(address)
def main_process():
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
return pool.map(geocode_worker, addresses)
if __name__ == '__main__':
main_process()
但它给了我这个错误。
Traceback (most recent call last):
File "/opt/anaconda3/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/opt/anaconda3/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/opt/anaconda3/lib/python3.7/multiprocessing/pool.py", line 470, in _handle_results
task = get()
File "/opt/anaconda3/lib/python3.7/multiprocessing/connection.py", line 251, in recv
return _ForkingPickler.loads(buf.getbuffer())
File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 599, in __getattr__
if not self.ok:
File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 536, in ok
return len(self) > 0
File "/opt/anaconda3/lib/python3.7/site-packages/geocoder/base.py", line 422, in __len__
return len(self._list)
最后三行错误不断重复 然后最后一行回溯是:
RecursionError: maximum recursion depth exceeded while calling a Python object
谁能帮我找出原因?
问题是 ArcgisQuery
返回的对象 geocoder
是不可picklable的--或者说,它是不可unpicklable的。unpickle过程由于使用了 __getattr__
,它在内部试图访问 self.ok
,最终依靠的是 self._list
要定义的,而在unpickling时没有定义,因为它只在 __init__
和 __init__
解除腌制时不调用. 由于它没有被定义,所以它试图使用 __getattr__
找到它,它试图访问 self.ok
再次,并创建无限循环。
你可以通过不传递 ArcgisQuery
对象本身,而只在worker进程和主进程之间传递其底层的 __dict__
. 然后,重建 ArcgisQuery
对象的主流程。
import multiprocessing
import geocoder
from geocoder.arcgis import ArcgisQuery
addresses = ['New York City, NY','Austin, TX', 'Los Angeles, CA', 'Boston, MA'] # and on and on
def geocode_worker(address):
out = geocoder.arcgis(address)
return out.__dict__ # Only return the object's __dict__
def main_process():
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
l = pool.map(geocode_worker, addresses)
out = []
for d in l:
q = ArcgisQuery(d['location']) # location is a required constructor arg
q.__dict__.update(d) # Load the rest of our state into the new object
out.append(q)
return out
if __name__ == '__main__':
print(main_process())
如果你实际上不需要整个 ArcgisQuery
对象,并且只需要其中的一些部分,你也可以直接从worker进程中返回这些部分,以避免这个黑客的需要。
值得一提的是,它看起来就像是 geocoder
可以通过实现 __getstate__
和 __setstate__
在ArcgisQuery或其基类上,像这样。
def __getstate__(self):
return self.__dict__
def __setstate__(self, state):
self.__dict__.update(state)