具有多处理功能的冗余打印(或过程?)

问题描述 投票:0回答:1

这是我第一次尝试使用 Python 多处理库进行多处理。简单版本的代码如下 -

import multiprocessing as mp
from dataclasses import dataclass
from typing import Dict, NoReturn
import time
import logging
import signal

import numpy as np


@dataclass
class TmpData:
    name: str
    value: int


def worker(name: str, data: TmpData) -> NoReturn:
    logger_obj = mp.log_to_stderr()
    logger_obj.setLevel(logging.INFO)
    logger_obj.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(data.value)


def init_worker_processes() -> None:
    signal.signal(signal.SIGINT, signal.SIG_IGN)


if __name__ == "__main__":
    map_data: Dict[str, TmpData] = {
        key: TmpData(name=key, value=np.random.randint(5, 15))
        for key in ["ABC", "DEF", "XYZ"]
    }

    main_logger = logging.getLogger()
    with mp.get_context("spawn").Pool(
        processes=2,
        initializer=init_worker_processes(),
    ) as pool:
        results = []
        for key in map_data:
            try:
                results.append(
                    pool.apply_async(
                        worker,
                        args=(
                            key,
                            map_data[key],
                        ),
                    )
                )
            except KeyboardInterrupt:
                pool.terminate()

        pool.close()
        pool.join()

        for result in results:
            try:
                result.get()
            except Exception as err:
                main_logger.error(f"{err}")

这会输出类似以下内容 -

[INFO/SpawnPoolWorker-2] name: ABC; value: 10
[INFO/SpawnPoolWorker-1] name: DEF; value: 10
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process shutting down
[INFO/SpawnPoolWorker-2] process exiting with exitcode 0
[INFO/SpawnPoolWorker-1] process exiting with exitcode 0
XYZ worker failed

我担心的是

[INFO/SpawnPoolWorker-2] name: XYZ; value: 12
打印了两次。我猜这只是打印问题(不是产生 2 个进程,因为 XYZ 工作失败的错误消息只出现一次)。使用 3 个进程初始化池时不会出现此问题。

现在,我想了解根本原因是什么以及如何解决它。有人可以帮助我了解我可能做错了什么以及如何解决它吗?

python python-3.x multiprocessing
1个回答
0
投票

问题是如何添加 stderr 记录器。当您调用

mp.log_to_stderr
时,线程不会删除现有记录器,而是向标准输出添加额外的记录器。因此,每次运行
worker
时,您都会向现有处理程序添加额外的
logging.StreamHandler
处理程序。

那么发生了什么:

  1. 线程 1 接受第一个作业并创建第一个
    logging.StreamHandler
  2. 线程 2 执行第二个作业并创建第一个作业
    logging.StreamHandler
  3. 线程 1 完成第一项工作(例如)
  4. 线程 1 承担第三个作业并创建第二个
    logging.StreamHandler
    。现在它将打印所有日志两次。

引入记录器的正确方法是使用

init_worker_processes
:

def worker(name: str, data: TmpData) -> NoReturn:
    # get existing logger which already has a stdout StreamHandler
    logger = mp.get_logger() 
    logger.info(f"name: {name}; value: {data.value}")

    if name == "XYZ":
        raise RuntimeError("XYZ worker failed")

    time.sleep(0.01)


def init_worker_processes() -> None:
    # this only runs single time per each thread
    logger = mp.log_to_stderr() 
    logger.setLevel(logging.INFO)

    signal.signal(signal.SIGINT, signal.SIG_IGN)

希望这有帮助

© www.soinside.com 2019 - 2024. All rights reserved.