多处理冗余打印

问题描述 投票:0回答:1

这是我第一次尝试使用 Python 多处理库进行多处理。简单版本的代码如下 -

import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import time
import logging
import signal

import numpy as np


@dataclass
class TmpData:
    name: str
    value: int


def worker(name: str, data: TmpData) -> NoReturn:
    logger_obj = mp.log_to_stderr()
    logger_obj.setLevel(logging.INFO)
    logger_obj.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(data.value)


def init_worker_processes() -> None:
    signal.signal(signal.SIGINT, signal.SIG_IGN)


if __name__ == "__main__":
    map_data: Dict[str, TmpData] = {
        key: TmpData(name=key, value=np.random.randint(5, 15))
        for key in ["ABC", "DEF", "XYZ"]
    }

    main_logger = logging.getLogger()
    with mp.get_context("spawn").Pool(
        processes=2,
        initializer=init_worker_processes(),
    ) as pool:
        results = []
        for key in map_data:
            try:
                results.append(
                    pool.apply_async(
                        worker,
                        args=(
                            key,
                            map_data[key],
                        ),
                    )
                )
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                main_logger.error(f"{err}")

这会输出类似以下内容 -

[INFO/SpawnPoolWorker-2] name: ABC; value: 10
[INFO/SpawnPoolWorker-1] name: DEF; value: 10
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process exiting with exitcode 0
XYZ worker failed

我担心的是

[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
打印了两次。我猜这只是打印问题(不是产生 2 个进程,因为 XYZ 工作失败的错误消息只出现一次)。使用 3 个进程初始化池时不会出现此问题。

现在,我想了解根本原因是什么以及如何解决它。有人可以帮助我了解我可能做错了什么以及如何解决它吗?

python python-3.x multiprocessing
1个回答
2
投票

问题是如何添加 stderr 记录器。当您调用

mp.log_to_stderr
时,线程不会删除现有的记录器处理程序,而是添加流到标准输出的附加处理程序。换句话说,每次运行
def worker(...)
时,您都会向线程记录器中的现有处理程序添加额外的
logging.StreamHandler

一步一步:

  1. 线程 1 接受第一个作业并创建它的第一个作业
    logging.StreamHandler
  2. 线程 2 接受第二个作业并创建第一个作业
    logging.StreamHandler
  3. 线程 1 完成第一项工作(作为示例)
  4. 线程 1 接受第三个作业并创建第二个作业
    logging.StreamHandler
    。现在它将打印所有日志两次。

要打印现有的记录器处理程序,您可以使用以下代码片段:

def worker(name: str, data: TmpData) -> NoReturn:    
    _ = mp.log_to_stderr()

    # print existing logger handlers:
    logger = mp.get_logger()    
    thread_name = mp.current_process().name
    print (thread_name, logger.handlers)

这将输出:

SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-2 [<StreamHandler <stderr> (NOTSET)>]
SpawnPoolWorker-1 [<StreamHandler <stderr> (NOTSET)>, <StreamHandler <stderr> (NOTSET)>]

正如您所见 - 第二个线程有两个 StreamHandler。因此,它将打印每个文本两次(每个处理程序打印一次)。

解决方案:

添加新记录器的正确方法是在

init_worker_processes
中进行。

def worker(name: str, data: TmpData) -> NoReturn:
    # get existing logger which already has a stdout StreamHandler
    logger = mp.get_logger() 
    logger.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(0.01)


def init_worker_processes() -> None:
    # this only runs single time per each thread
    logger = mp.log_to_stderr() 
    logger.setLevel(logging.INFO)

    signal.signal(signal.SIGINT, signal.SIG_IGN)

希望这有帮助。

© www.soinside.com 2019 - 2024. All rights reserved.