这是我第一次尝试使用 Python 多处理库进行多处理。简单版本的代码如下 -
import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import time
import logging
import signal
import numpy as np
@dataclass
class TmpData:
name: str
value: int
def worker(name: str, data: TmpData) -> NoReturn:
logger_obj = mp.log_to_stderr()
logger_obj.setLevel(logging.INFO)
logger_obj.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(data.value)
def init_worker_processes() -> None:
signal.signal(signal.SIGINT, signal.SIG_IGN)
if __name__ == "__main__":
map_data: Dict[str, TmpData] = {
key: TmpData(name=key, value=np.random.randint(5, 15))
for key in ["ABC", "DEF", "XYZ"]
}
main_logger = logging.getLogger()
with mp.get_context("spawn").Pool(
processes=2,
initializer=init_worker_processes(),
) as pool:
results = []
for key in map_data:
try:
results.append(
pool.apply_async(
worker,
args=(
key,
map_data[key],
),
)
)
except KeyboardInterrupt:
pool.terminate()
pool.close()
pool.join()
for result in results:
try:
result.get()
except Exception as err:
main_logger.error(f"{err}")
这会输出类似以下内容 -
[INFO/SpawnPoolWorker-2] name: ABC; value: 10
[INFO/SpawnPoolWorker-1] name: DEF; value: 10
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process exiting with exitcode 0
XYZ worker failed
我担心的是
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
打印了两次。我猜这只是打印问题(不是产生 2 个进程,因为 XYZ 工作失败的错误消息只出现一次)。使用 3 个进程初始化池时不会出现此问题。
现在,我想了解根本原因是什么以及如何解决它。有人可以帮助我了解我可能做错了什么以及如何解决它吗?
问题是如何添加 stderr 记录器。当您调用
mp.log_to_stderr
时,线程不会删除现有的记录器处理程序,而是添加流到标准输出的附加处理程序。换句话说,每次运行 def worker(...)
时,您都会向线程记录器中的现有处理程序添加额外的 logging.StreamHandler
。
一步一步:
logging.StreamHandler
logging.StreamHandler
logging.StreamHandler
。现在它将打印所有日志两次。要打印现有的记录器处理程序,您可以使用以下代码片段:
def worker(name: str, data: TmpData) -> NoReturn:
_ = mp.log_to_stderr()
# print existing logger handlers:
logger = mp.get_logger()
thread_name = mp.current_process().name
print (thread_name, logger.handlers)
这将输出:
SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-2 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>, <StreamHandler <stderr> (NOTSET)>]
正如您所见 - 第二个线程有两个 StreamHandler。因此,它将打印每个文本两次(每个处理程序打印一次)。
解决方案:
添加新记录器的正确方法是在
init_worker_processes
中进行。
def worker(name: str, data: TmpData) -> NoReturn:
# get existing logger which already has a stdout StreamHandler
logger = mp.get_logger()
logger.info(f"name: {name}; value: {data.value}")
if name == "XYZ":
raise RuntimeError("XYZ worker failed")
time.sleep(0.01)
def init_worker_processes() -> None:
# this only runs single time per each thread
logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)
signal.signal(signal.SIGINT, signal.SIG_IGN)
希望这有帮助。