我对多线程很陌生,想探索一下。我有一个 json 文件,它提供了一些配置。基于此,我需要开始一些处理。这是配置
{
"job1":{
"param1":"val1",
"param2":"val2"
},
"job2":{
"param3":"val3",
"param4":"val4"
}
}
这是 python 片段
config_file = open('config.json')
config_data = json.load(config_file)
for job_name,job_atts in metric_data.items():
perform_job(job_name,job_atts)
这样一来,我就可以一个接一个地完成作业了
有没有办法并行运行/启动这些作业?请注意,这些工作彼此完全独立,不需要按顺序执行。
如何通过 python 实现并行运行?
更新
Here is what i tried
>>> from multiprocessing import Pool
>>>
>>> config_data = json.loads(''' {
... "job1":{
... "param1":"val1",
... "param2":"val2"
... },
... "job2":{
... "param3":"val3",
... "param4":"val4"
... }
... }''')
>>> def perform_job(job_name,job_atts):
... print(job_name)
... print(job_atts)
...
>>> args = [(name, attrs)
... for name, attrs in config_data.items()]
>>>
>>> with Pool() as pool:
... pool.starmap(perform_job, args)
...
Process SpawnPoolWorker-27:
Process SpawnPoolWorker-24:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 114, in worker
task = get()
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 368, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'perform_job' on <module '__main__' (built-in)>
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 114, in worker
task = get()
File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 368, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'perform_job' on <module '__main__' (built-in)>
但我仍然收到错误消息
您正在寻找 多处理 模块。 使用进程池迭代许多作业。
这里是一个示例
spawn.py
正确运行的源文件。
最相关的行是最后两行。
#! /usr/bin/env python
from multiprocessing import Pool
import json
config_data = json.loads(
""" {
"job1":{
"param1":"val1",
"param2":"val2"
},
"job2":{
"param3":"val3",
"param4":"val4"
}
} """
)
args = list(config_data.items())
def perform_job(job_name, job_atts):
print(job_name, job_atts)
if __name__ == "__main__":
# Single core:
perform_job(*args[0])
perform_job(*args[1])
print()
# Multi core:
with Pool() as pool:
pool.starmap(perform_job, args)
除了
multiprocessing
和threading
,你还可以使用concurrent.future
,它更高级更简单:
import json
from concurrent.futures import ThreadPoolExecutor
def perform_job(name, atts):
print(f"{name}: {atts}")
with open("config.json") as stream:
config_data = json.load(stream)
with ThreadPoolExecutor() as executor:
for job_name, job_atts in config_data.items():
executor.submit(perform_job, job_name, job_atts)
输出:
job1: {'param1': 'val1', 'param2': 'val2'}
job2: {'param3': 'val3', 'param4': 'val4'}