这是我第一次尝试使用 Python 多处理库进行多处理。简单版本的代码如下 -
import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import time
import logging
import signal
import numpy as np
@dataclass
class TmpData:
name: str
value: int
def worker(name: str, data: TmpData) -> NoReturn:
logger_obj = mp.log_to_stderr()
logger_obj.setLevel(logging.INFO)
logger_obj.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(data.value)
def init_worker_processes() -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == "__main__":
map_data: Dict[str, TmpData] = {
key: TmpData(name=key, value=np.random.randint(5, 15))
for key in ["ABC", "DEF", "XYZ"]
}
main_logger = logging.getLogger()
with mp.get_context("spawn").Pool(
processes=2,
initializer=init_worker_processes(),
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
main_logger.error(f"{err}")
这会输出类似以下内容 -
[INFO/SpawnPoolWorker-2] name: ABC; value: 10
[INFO/SpawnPoolWorker-1] name: DEF; value: 10
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process exiting with exitcode 0
XYZ worker failed
我担心的是
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
打印了两次。我猜这只是打印问题(不是产生 2 个进程,因为 XYZ 工作失败的错误消息只出现一次)。使用 3 个进程初始化池时不会出现此问题。
现在,我想了解根本原因是什么以及如何解决它。有人可以帮助我了解我可能做错了什么以及如何解决它吗?
问题是如何添加 stderr 记录器。当您调用
mp.log_to_stderr
时,线程不会删除现有记录器,而是向标准输出添加额外的记录器。因此,每次运行 worker
时,您都会向现有处理程序添加额外的 logging.StreamHandler
处理程序。
那么发生了什么:
logging.StreamHandler
logging.StreamHandler
logging.StreamHandler
。现在它将打印所有日志两次。引入记录器的正确方法是使用
init_worker_processes
:
def worker(name: str, data: TmpData) -> NoReturn:
# get existing logger which already has a stdout StreamHandler
logger = mp.get_logger()
logger.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(0.01)
def init_worker_processes() -> None:
# this only runs single time per each thread
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)
希望这有帮助