我正在尝试使用dask.distributed
同时根据来自多个CSV文件的内容更新Postgresql数据库。理想情况下,我们会在N个工作人员之间分发CSV文件,其中每个工作人员都会将CSV文件内容插入数据库。但是,在向工作人员分发任务时使用Cannot pickle files that are not opened for reading
时会得到Client.map()
异常。
这是代码的精简版本:
def _work(csv_path):
db = Database() # encapsulates interaction w/ postgresql database
db.open()
count = 0
with csv_path.open('r') as csv_file:
reader = csv.DictReader(csv_file)
for record in reader:
db.insert(record)
count += 1
db.close()
return count
client = Client(processes=False)
csv_files = Path('/data/files/').glob('*.csv')
csv_futures = client.map(_work, csv_files) # error occurs here
for finished in as_completed(csv_futures):
count = finished.result()
print(count)
基于相关的stackoverflow和github问题,我成功地使用cloudpickle
来序列化和反序列化函数和参数。
cloudpickle.loads(cloudpickle.dumps(_work))
Out[69]: <function _work(csv_path)>
和
files = list(Path('/data/files/').glob('*.csv'))
files
Out[73]:
[PosixPath('/data/files/208.csv'),
PosixPath('/data/files/332.csv'),
PosixPath('/data/files/125.csv'),
PosixPath('/data/files/8.csv')]
cloudpickle.loads(cloudpickle.dumps(files))
Out[74]:
[PosixPath('/data/files/208.csv'),
PosixPath('/data/files/332.csv'),
PosixPath('/data/files/125.csv'),
PosixPath('/data/files/8.csv')]
所以,问题出在其他地方。
确切的例外是这样的:
File "/Users/may/anaconda/envs/eagle-i/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 841, in save_file
raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode)
_pickle.PicklingError: Cannot pickle files that are not opened for reading: a
通过调试器,我很好奇obj
是什么,这是这样的:
<_io.TextIOWrapper name='/tmp/logs/ei_sched.log' mode='a' encoding='UTF-8'>
在上面给出的示例代码片段中,我省略了对记录器的调用,而cloudpickle
就是这样抱怨的。在尝试使用dask来并行化此功能之前,日志记录是此功能的剩余工件。一旦我从传递给Client.map()
的函数中删除了日志记录调用,事情按预期工作。
顺便说一句,这是cloudpickle
的一个很好的捕获因为记录到单个文件不应该从dask worker中完成。