优化 Python 3.x 中的内存密集型并行处理

问题描述 投票:0回答:1

我一直在利用

multiprocessing
模块开展一个涉及 Python 3.x 中内存密集型并行处理的项目。手头的任务需要处理无法完全放入内存的大型数据集。在尝试了
multiprocessing.Pool
后,我遇到了由于数据集在进程之间重复而导致的内存问题。

在尝试缓解此问题时,我考虑利用共享内存技术,例如

multiprocessing.shared_memory
或使用
mmap
的内存映射文件。但是,我不确定我的特定场景的最佳实现。此外,潜在的同步问题以及全局解释器锁 (GIL) 对多进程执行的影响也值得关注。

这是我迄今为止尝试过的:

import multiprocessing

def process_data(chunk):
    # Process the data chunk
    pass

if __name__ == "__main__":
    data = ...  # Large dataset
    chunk_size = len(data) // multiprocessing.cpu_count()

    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        pool.map(process_data, [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)])

我的期望: 我希望代码能够跨进程有效地分配数据块,避免过多的内存使用。每个进程都会处理其分配的数据块,利用并行处理功能,并理想地减少内存开销。

实际发生了什么: 然而,由于 Python 多处理的工作方式,

data
对象会在进程之间重复,导致内存消耗较高。此外,我不确定处理进程间同步的最佳方法以及如何解决 GIL 施加的潜在限制。

我正在寻求建议、代码示例和最佳实践来应对这些挑战并在 Python 3.x 中实现内存高效的并行处理。谢谢您的协助!

python-3.x memory-management parallel-processing multiprocessing synchronization
1个回答
0
投票

在 Python 3.x 中处理内存密集型并行处理时,您可以采用一些策略来优化内存使用并解决您提到的问题。

  1. 共享内存:一种方法是使用共享内存技术来避免跨进程复制数据集。您提到了

    multiprocessing.shared_memory
    和使用
    mmap
    的内存映射文件。根据您的具体要求,这两种都是可行的选择。

    • multiprocessing.shared_memory
      :该模块提供了一种创建可由多个进程访问的共享内存块的方法。您可以分配一个共享内存块,然后用它来存储数据集。每个进程都可以访问共享内存块,而无需复制数据。但是,请注意,此模块仅从 Python 3.8 开始可用。

    • mmap
      :内存映射文件允许您将文件映射到内存中并像访问大型数组一样访问它。您可以创建内存映射文件并将数据集存储在其中。然后每个进程都可以访问该文件而无需复制数据。这种方法对于不能完全放入内存的大型数据集非常有效。但是,请记住,内存映射文件有一些开销,您需要手动管理同步。

  2. 同步:使用多进程时,需要考虑同步,以保证数据一致性,避免竞争情况。 Python 在

    multiprocessing
    模块中提供了各种同步原语,例如
    Lock
    Semaphore
    Event
    。您可以使用这些原语来协调对共享资源的访问或实现自定义同步机制。

    例如,如果您使用共享内存,则可以使用

    Lock
    来保护多个进程可能同时访问或修改共享内存的代码关键部分。

  3. 全局解释器锁(GIL):Python 中的 GIL 可以防止多个本机线程同时执行 Python 字节码。但是,它并不能阻止跨多个进程的并行执行。在你的例子中,由于你使用的是

    multiprocessing
    模块,每个进程都会有自己的 Python 解释器和自己的 GIL。这允许跨进程真正并行执行,有效利用多个 CPU 核心。

    请记住,如果您使用线程而不是进程,GIL 将限制并行性。但是,通过进程,您可以充分利用可用的 CPU 内核。

以下是如何修改代码以使用共享内存和同步的示例:

import multiprocessing
import numpy as np

def process_data(shared_data, chunk_index):
    # Access the shared data using the chunk_index
    chunk = shared_data[chunk_index]
    # Process the chunk
    pass

if __name__ == "__main__":
    data = ...  # Large dataset
    chunk_size = len(data) // multiprocessing.cpu_count()

    # Create a shared memory block
    shared_data = multiprocessing.RawArray('i', len(data))
    np_data = np.frombuffer(shared_data, dtype=np.int32)
    np_data[:] = data

    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        pool.map(process_data, range(multiprocessing.cpu_count()))

在此示例中,我们使用

multiprocessing.RawArray
创建一个共享内存块并将数据集存储在其中。每个进程都可以使用块索引访问共享内存块。
process_data
函数将共享数据和块索引作为参数并处理相应的块。

如果多个进程同时访问或修改共享内存,请记住适当处理同步。

通过使用共享内存和适当的同步,您可以在 Python 3.x 中实现内存高效的并行处理,同时避免过多的内存使用并解决与 GIL 相关的问题。

© www.soinside.com 2019 - 2024. All rights reserved.