Python处理列表/队列中的项目并保存进度

问题描述 投票:0回答:2

如果我在python中处理大约1000多万个小任务(转换图像左右),如何在处理崩溃的情况下创建队列并保存进度。要明确的是,如何保存进度或停止处理我想要的任何内容并从最后一点继续处理。

在这种情况下如何处理多个线程?

一般来说,问题是如何将已处理数据的进度保存到文件中。问题是,如果它是非常小的文件,每次迭代后保存文件将比处理本身更长...

谢谢!

(对不起我的英文,如果不清楚的话)

python multithreading save queue progress
2个回答
0
投票

首先,我建议不要去多线程。改为使用多处理。在涉及计算密集型任务时,由于GIL,多线程在python中不能同步工作。

要解决保存结果的问题,请按以下顺序使用

  1. 获取列表中所有文件的名称,并将列表分成块。
  2. 现在为每个进程分配一个块。
  3. 将每1000步后的已处理文件的名称附加到系统上的某个文件(例如monitor.txt)(假设您可以在发生故障时再次处理1000个文件)。
  4. 如果失败,请跳过monitor.txt中保存的所有文件,以用于每个进程。

您可以为每个进程安装monitor_1.txt,monitor_2.txt ...这样您就不必为每个进程读取整个文件。

以下要点可能对你有所帮助。你只需要为第4点添加代码。 https://gist.github.com/rishibarve/ccab04b9d53c0106c6c3f690089d0229


0
投票

保存文件等I / O操作总是比较慢。如果必须处理大量文件,无论使用多少线程,都会遇到很长的I / O时间。

最简单的方法是使用多线程而不是多处理,并让操作系统的调度程序将其全部用完。 The docs对如何设置线程有一个很好的解释。一个简单的例子就是

from threading import Thread

def process_data(file_name):
    # does the processing
    print(f'processed {file_name}')

if __name__ == '__main__':
    file_names = ['file_1', 'file_2']
    processes = [Thread(target=process_data, args=(file_name,)) for file_name in file_names]

    # here you start all the processes
    for proc in processes:
        proc.start()

    # here you wait for all processes to finish
    for proc in processes:
        proc.join()

一种可能更快的解决方案是创建一个执行I / O的单独进程。然后使用multiprocessing.Queue对“数据进程线程”中的文件进行排队,让I / O线程选择这些文件并一个接一个地处理它们。

这样I / O就不必休息,这将接近最佳状态。我不知道这是否会比基于线程的解决方案产生很大的优势,但通常情况下是并发性,最好的方法是使用自己的应用程序进行一些基准测试。

需要注意的一个问题是,如果数据处理速度更快,那么Queue可能会变得非常大。这可能会对性能产生影响,具体取决于您的系统等。一个快速的解决方法是在队列变大时暂停数据处理。

记得用脚本编写Python中的所有多处理代码

if __name__ == '__main__':
    # mp code

保护,并注意一些IDE与并发Python代码不兼容。安全的赌注是通过从终端执行代码来测试您的代码。

© www.soinside.com 2019 - 2024. All rights reserved.