我有一个简单的任务。特定功能需求大量文件来运行。这个任务可以很容易地并行化。
这里是工作的代码:
# filelist is the directory containing two file, a.txt and b.txt.
# a.txt is the first file, b.xt is the second file
# I pass a file that lits the names of the two files to the main program
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import sys
def translate(filename):
print(filename)
f = open(filename, "r")
g = open(filename + ".x", , "w")
for line in f:
g.write(line)
def main(path_to_file_with_list):
futures = []
with ProcessPoolExecutor(max_workers=8) as executor:
for filename in Path(path_to_file_with_list).open():
executor.submit(translate, "filelist/" + filename)
for future in as_completed(futures):
future.result()
if __name__ == "__main__":
main(sys.argv[1])
然而,没有新的文件被创建,即该文件夹不包含a.txt.x和b.txt.x文件。
什么是错的,上面的代码,我如何使它发挥作用?
谢谢。
这应该让你在正确的道路上。如果它不工作,不是一个明显的bug,那么我怀疑你可能没有所有的文件路径正确的......我要指出的是写文件,将受益于线程不是从减少开销的过程了。文件I / O应该释放GIL所以你从加速(如果你一次复制多行显著更多。)这就是说,如果你只是复制文件,你应该只使用shutil.copy
或shutil.copy2
受益
from concurrent.futures import ProcessPoolExecutor, wait
from pathlib import Path
import sys
def translate(filename):
print(filename)
with open(filename, "r") as f, open(filename + ".x", , "w") as g:
for line in f:
g.write(line)
def main(path_to_file_with_list):
futures = []
with ProcessPoolExecutor(max_workers=8) as executor:
for filename in Path(path_to_file_with_list).open():
futures.append(executor.submit(translate, "filelist/" + filename))
wait(futures) #simplify waiting on processes if you don't need the result.
for future in futures:
if future.excpetion() is not None:
raise future.exception() #ProcessPoolEcecutors swallow exceptions without telling you...
print("done")
if __name__ == "__main__":
main(sys.argv[1])