“无法使用Client.map()来挑选未打开的文件”

问题描述 投票:1回答:1

我正在尝试使用dask.distributed同时根据来自多个CSV文件的内容更新Postgresql数据库。理想情况下,我们会在N个工作人员之间分发CSV文件,其中每个工作人员都会将CSV文件内容插入数据库。但是,在向工作人员分发任务时使用Cannot pickle files that are not opened for reading时会得到Client.map()异常。

这是代码的精简版本:

def _work(csv_path):
   db = Database() # encapsulates interaction w/ postgresql database
   db.open()

   count = 0

   with csv_path.open('r') as csv_file:
       reader = csv.DictReader(csv_file)

       for record in reader:
           db.insert(record)
           count += 1

   db.close()

   return count


client = Client(processes=False)

csv_files = Path('/data/files/').glob('*.csv')

csv_futures = client.map(_work, csv_files) # error occurs here

for finished in  as_completed(csv_futures):
   count = finished.result()
   print(count)

基于相关的stackoverflow和github问题,我成功地使用cloudpickle来序列化和反序列化函数和参数。

cloudpickle.loads(cloudpickle.dumps(_work))
Out[69]: <function _work(csv_path)>

files = list(Path('/data/files/').glob('*.csv'))
files
Out[73]: 
[PosixPath('/data/files/208.csv'),
 PosixPath('/data/files/332.csv'),
 PosixPath('/data/files/125.csv'),
 PosixPath('/data/files/8.csv')]
cloudpickle.loads(cloudpickle.dumps(files))
Out[74]: 
[PosixPath('/data/files/208.csv'),
 PosixPath('/data/files/332.csv'),
 PosixPath('/data/files/125.csv'),
 PosixPath('/data/files/8.csv')]

所以,问题出在其他地方。

python dask dask-distributed
1个回答
1
投票

确切的例外是这样的:

  File "/Users/may/anaconda/envs/eagle-i/lib/python3.6/site-packages/cloudpickle/cloudpickle.py", line 841, in save_file
    raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode)
_pickle.PicklingError: Cannot pickle files that are not opened for reading: a

通过调试器,我很好奇obj是什么,这是这样的:

<_io.TextIOWrapper name='/tmp/logs/ei_sched.log' mode='a' encoding='UTF-8'>

在上面给出的示例代码片段中,我省略了对记录器的调用,而cloudpickle就是这样抱怨的。在尝试使用dask来并行化此功能之前,日志记录是此功能的剩余工件。一旦我从传递给Client.map()的函数中删除了日志记录调用,事情按预期工作。

顺便说一句,这是cloudpickle的一个很好的捕获因为记录到单个文件不应该从dask worker中完成。

热门问题
推荐问题
最新问题