多处理期间记录到队列失败

问题描述 投票:0回答:1

TL;DR:为什么控制台视图中的(内部运行)打印语句中没有列出记录器的处理程序?

寻找此日志记录方案无法正常工作的原因的解释。

我正在遵循(非常严格地)将多个进程记录到在 python dox 中找到的here的同一日志文件中的方法。下面的代码的主要区别是我试图实现一个 Worker 类而不仅仅是一个函数。也许我可以切换回函数,但它作为一个类更适合项目的更大方案。无论如何...

当遵循 dox 的基本指导时,我可以启动日志侦听器功能并正常运行,但当

worker
尝试记录时,事情就会崩溃。我可以看到队列处理程序在配置函数期间添加到工作记录器中,在
__init__
期间调用,并且 init 中的日志记录语句is 按预期在日志中捕获。

但是,在

run()
函数内部,轮子掉下来了,就好像创建的记录器忘记了它的处理程序。

我猜测

multiprocessing
进程的启动方式存在一些细微差别,导致出现这种情况,但是花了一些时间来 T/S,我对解释为什么这不起作用或为工作人员配置记录器的更好方法
Process
这是一个类。

代码:

import logging
import random
import sys
import time
from logging import handlers
from multiprocessing import Queue, Process
from queue import Empty, Full


def worker_configurer(log_queue, idx):
    logger = logging.getLogger(".".join(("A", "worker", str(idx))))
    h = handlers.QueueHandler(log_queue)
    logger.addHandler(h)
    logger.setLevel(logging.INFO)
    print(
        f"configured worker {idx} with logger {logger.name} with handlers: {logger.handlers.copy()}"
    )
    return logger


class Worker(Process):
    worker_idx = 0

    def __init__(self, work_queue, log_queue, worker_configurer, **kwargs):
        super(Worker, self).__init__()
        self.idx = Worker.worker_idx
        Worker.worker_idx += 1
        self.logger = worker_configurer(log_queue, self.idx)
        print(f"self.logger handlers during init: {self.logger.handlers.copy()}")
        self.logger.info(f"worker {self.idx} initialized")  # <-- does show up in log
        self.work_queue = work_queue

    def run(self):
        print(
            f"(inside run): self.logger name: {self.logger.name}, handlers:"
            f" {self.logger.handlers.copy()}"
        )

        self.logger.info(f"worker {self.idx} started!")  # <-- will NOT show up in log
        while True:
            job_duration = self.work_queue.get()
            if job_duration is None:
                print(f"Worker {self.idx} received stop signal")
                break
            time.sleep(job_duration)
            # book the job...
            print(f"worker {self.idx} finished job of length {job_duration}")
            self.logger.info(f"worker {self.idx} finished job of length {job_duration}")


def listener_configurer():
    logging.basicConfig(
        filename="mp_log.log",
        filemode="a",
        format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
        datefmt="%d-%b-%y %H:%M:%S",
        level=logging.INFO,
    )


def listener_process(queue, configurer):
    configurer()  # redundant (for now), but harmless
    logger = logging.getLogger("A")
    while True:
        try:
            record = queue.get(timeout=5)
            print("bagged a message from the queue")
            if (
                record is None
            ):  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Empty:
            pass
        except Exception:
            import sys, traceback

            print("Whoops! Problem:", file=sys.stderr)
            traceback.print_exc(file=sys.stderr)


if __name__ == "__main__":
    listener_configurer()
    logger = logging.getLogger("A")
    logger.warning("Logger Active!")
    work_queue = Queue(5)
    log_queue = Queue(100)

    # start the logging listener
    listener = Process(target=listener_process, args=(log_queue, listener_configurer))
    listener.start()
    # make workers
    num_workers = 2
    workers = []
    for i in range(num_workers):
        w = Worker(
            work_queue,
            log_queue=log_queue,
            worker_configurer=worker_configurer,
        )
        w.start()
        workers.append(w)
        logger.info(f"worker {i} created")

    num_jobs = 10
    jobs_assigned = 0
    while jobs_assigned < num_jobs:
        try:
            work_queue.put(random.random() * 2, timeout=0.1)
            jobs_assigned += 1
        except Full:
            pass

    print("Call it a day and send stop sentinel to everybody")
    for i in range(num_workers):
        work_queue.put(None)
    log_queue.put(None)

    for w in workers:
        w.join()
        print("another worker retired!")

    listener.join()

控制台:

configured worker 0 with logger A.worker.0 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
configured worker 1 with logger A.worker.1 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
bagged a message from the queue
bagged a message from the queue
(inside run): self.logger name: A.worker.1, handlers: []
(inside run): self.logger name: A.worker.0, handlers: []
worker 0 finished job of length 1.2150712953970373
worker 1 finished job of length 1.2574239731920005
worker 1 finished job of length 0.11736058130132943
Call it a day and send stop sentinel to everybody
worker 0 finished job of length 0.4843796181316009
worker 1 finished job of length 1.048915894468737
bagged a message from the queue
worker 0 finished job of length 1.2749454212499574
worker 0 finished job of length 0.7298640313585205
worker 1 finished job of length 1.6144333153092076
worker 1 finished job of length 1.219077068714904
Worker 1 received stop signal
worker 0 finished job of length 1.561689295025705
Worker 0 received stop signal
another worker retired!
another worker retired!

日志文件:

08-May-24 17:33:15 | A | WARNING | Logger Active!
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A | INFO | worker 0 created
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized
08-May-24 17:33:15 | A | INFO | worker 1 created
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized
python logging python-multiprocessing
1个回答
0
投票
multiprocessing.set_start_method('fork')

您的代码在主进程内初始化记录器处理程序,而不是新进程。

典当 ...子进程只会继承运行进程对象的 run() 方法所需的资源。特别是,来自父进程的不必要的文件描述符和句柄将不会被继承。

叉子 ...父进程的所有资源都由子进程继承。

多处理启动方法

© www.soinside.com 2019 - 2024. All rights reserved.