我正在尝试使用 concurrent.futures
读数 .json
文件并将结果存储在一个列表中,下面是我的代码。
data = []
PATH_TO_JSON = '/media/My Passport/echo2_1'
def jsonread (file):
path = PATH_TO_JSON + '/' + file
with open(path, 'r', encoding='utf-8') as f:
for line in f:
data.append(json.loads(line.rstrip('\n|\r')))
with concurrent.futures.ProcessPoolExecutor() as executor:
files = [file for file in listdir(PATH_TO_JSON)]
results = executor.map(jsonread, files)
但代码返回一个空的 data
列表。我发现一个类似的问题 此处. 但我无法将答案应用到我的工作中。我希望得到一个详细的答案,因为我是一个新手,我的工作是在 multiprocessing
和 concurrent.futures
.
每个进程都会以自己的内存开始。这使得在全局列表中很难共享数据。相反,可以尝试重构 jsonread
方法来代替返回一个列表。
def jsonread(file):
data = []
path = PATH_TO_JSON + '/' + file
with open(path, 'r', encoding='utf-8') as f:
for line in f:
data.append(json.loads(line.rstrip('\n|\r')))
return data
对于多进程代码,你必须确保Python解释器能够无副作用地导入主模块。每个进程都会启动一个解释器来加载主模块,所以你必须确保只有主进程 (第一个解释器) 会启动 ProcessPoolExecutor
. 重构你的代码,包括一个围绕着 with
语句。
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
files = [file for file in listdir(PATH_TO_JSON)]
results = executor.map(jsonread, files)
如果我们要打印 results
最后我们得到。
print(results)
<generator object _chain_from_iterable_of_lists at 0x000001B45FD4A200>
你看到的是我们有一个生成器 在python中,生成器是懒惰的,它们不会做任何事情,直到你调用了 next
在他们身上。因为我们改变了 jsonread
来返回一个字符串列表,我们还必须将它们连接在一起。循环处理 jsonread
将隐性地调用 next
在发电机上。最后的代码。
PATH_TO_JSON = '/media/My Passport/echo2_1'
def jsonread(file):
data = []
path = PATH_TO_JSON + '/' + file
with open(path, 'r', encoding='utf-8') as f:
for line in f:
data.append(json.loads(line.rstrip('\n|\r')))
return data
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
files = [file for file in listdir(PATH_TO_JSON)]
results = executor.map(jsonread, files)
data = [line for result in results for line in result]