Python3:如何并行生成作业

问题描述 投票:0回答:2

我对多线程很陌生,想探索一下。我有一个 json 文件,它提供了一些配置。基于此,我需要开始一些处理。这是配置

{
    "job1":{
        "param1":"val1",
        "param2":"val2"
    },
    "job2":{
        "param3":"val3",
        "param4":"val4"
    }
}

这是 python 片段

config_file = open('config.json')
config_data = json.load(config_file)
for job_name,job_atts in metric_data.items():
    perform_job(job_name,job_atts)

这样一来,我就可以一个接一个地完成作业了

有没有办法并行运行/启动这些作业?请注意,这些工作彼此完全独立,不需要按顺序执行。

如何通过 python 实现并行运行?

更新

Here is what i tried

    >>> from multiprocessing import Pool
    >>> 
    >>> config_data = json.loads(''' {
    ...     "job1":{
    ...         "param1":"val1",
    ...         "param2":"val2"
    ...     },
    ...     "job2":{
    ...         "param3":"val3",
    ...         "param4":"val4"
    ...     }
    ... }''')
    >>> def perform_job(job_name,job_atts):
    ...     print(job_name)
    ...     print(job_atts)
    ... 
    >>> args = [(name, attrs)
    ...         for name, attrs in config_data.items()]
    >>> 
    >>> with Pool() as pool:
    ...     pool.starmap(perform_job, args)
    ... 
    Process SpawnPoolWorker-27:
    Process SpawnPoolWorker-24:
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
        self.run()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 114, in worker
        task = get()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 368, in get
        return _ForkingPickler.loads(res)
    AttributeError: Can't get attribute 'perform_job' on <module '__main__' (built-in)>
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
        self.run()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 114, in worker
        task = get()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 368, in get
        return _ForkingPickler.loads(res)
    AttributeError: Can't get attribute 'perform_job' on <module '__main__' (built-in)>
    

但我仍然收到错误消息

python python-3.x multithreading python-multithreading
2个回答
1
投票

您正在寻找 多处理 模块。 使用进程池迭代许多作业。

这里是一个示例

spawn.py
正确运行的源文件。 最相关的行是最后两行。

#! /usr/bin/env python
from multiprocessing import Pool
import json

config_data = json.loads(
    """ {
    "job1":{
        "param1":"val1",
        "param2":"val2"
    },
    "job2":{
        "param3":"val3",
        "param4":"val4"
    }
} """
)
args = list(config_data.items())


def perform_job(job_name, job_atts):
    print(job_name, job_atts)


if __name__ == "__main__":

    # Single core:
    perform_job(*args[0])
    perform_job(*args[1])
    print()

    # Multi core:
    with Pool() as pool:
        pool.starmap(perform_job, args)


0
投票

除了

multiprocessing
threading
,你还可以使用
concurrent.future
,它更高级更简单:

import json
from concurrent.futures import ThreadPoolExecutor


def perform_job(name, atts):
    print(f"{name}: {atts}")


with open("config.json") as stream:
    config_data = json.load(stream)

with ThreadPoolExecutor() as executor:
    for job_name, job_atts in config_data.items():
        executor.submit(perform_job, job_name, job_atts)

输出:

job1: {'param1': 'val1', 'param2': 'val2'}
job2: {'param3': 'val3', 'param4': 'val4'}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.