TL;DR:为什么控制台视图中的(内部运行)打印语句中没有列出记录器的处理程序?
寻找此日志记录方案无法正常工作的原因的解释。
我正在遵循(非常严格地)将多个进程记录到在 python dox 中找到的here的同一日志文件中的方法。下面的代码的主要区别是我试图实现一个 Worker 类而不仅仅是一个函数。也许我可以切换回函数,但它作为一个类更适合项目的更大方案。无论如何...
当遵循 dox 的基本指导时,我可以启动日志侦听器功能并正常运行,但当
worker
尝试记录时,事情就会崩溃。我可以看到队列处理程序在配置函数期间添加到工作记录器中,在 __init__
期间调用,并且 init 中的日志记录语句is 按预期在日志中捕获。
但是,在
run()
函数内部,轮子掉下来了,就好像创建的记录器忘记了它的处理程序。
我猜测
multiprocessing
进程的启动方式存在一些细微差别,导致出现这种情况,但是花了一些时间来 T/S,我对解释为什么这不起作用或为工作人员配置记录器的更好方法 Process
这是一个类。
import logging
import random
import sys
import time
from logging import handlers
from multiprocessing import Queue, Process
from queue import Empty, Full
def worker_configurer(log_queue, idx):
logger = logging.getLogger(".".join(("A", "worker", str(idx))))
h = handlers.QueueHandler(log_queue)
logger.addHandler(h)
logger.setLevel(logging.INFO)
print(
f"configured worker {idx} with logger {logger.name} with handlers: {logger.handlers.copy()}"
)
return logger
class Worker(Process):
worker_idx = 0
def __init__(self, work_queue, log_queue, worker_configurer, **kwargs):
super(Worker, self).__init__()
self.idx = Worker.worker_idx
Worker.worker_idx += 1
self.logger = worker_configurer(log_queue, self.idx)
print(f"self.logger handlers during init: {self.logger.handlers.copy()}")
self.logger.info(f"worker {self.idx} initialized") # <-- does show up in log
self.work_queue = work_queue
def run(self):
print(
f"(inside run): self.logger name: {self.logger.name}, handlers:"
f" {self.logger.handlers.copy()}"
)
self.logger.info(f"worker {self.idx} started!") # <-- will NOT show up in log
while True:
job_duration = self.work_queue.get()
if job_duration is None:
print(f"Worker {self.idx} received stop signal")
break
time.sleep(job_duration)
# book the job...
print(f"worker {self.idx} finished job of length {job_duration}")
self.logger.info(f"worker {self.idx} finished job of length {job_duration}")
def listener_configurer():
logging.basicConfig(
filename="mp_log.log",
filemode="a",
format="%(asctime)s | %(name)s | %(levelname)s | %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
level=logging.INFO,
)
def listener_process(queue, configurer):
configurer() # redundant (for now), but harmless
logger = logging.getLogger("A")
while True:
try:
record = queue.get(timeout=5)
print("bagged a message from the queue")
if (
record is None
): # We send this as a sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Empty:
pass
except Exception:
import sys, traceback
print("Whoops! Problem:", file=sys.stderr)
traceback.print_exc(file=sys.stderr)
if __name__ == "__main__":
listener_configurer()
logger = logging.getLogger("A")
logger.warning("Logger Active!")
work_queue = Queue(5)
log_queue = Queue(100)
# start the logging listener
listener = Process(target=listener_process, args=(log_queue, listener_configurer))
listener.start()
# make workers
num_workers = 2
workers = []
for i in range(num_workers):
w = Worker(
work_queue,
log_queue=log_queue,
worker_configurer=worker_configurer,
)
w.start()
workers.append(w)
logger.info(f"worker {i} created")
num_jobs = 10
jobs_assigned = 0
while jobs_assigned < num_jobs:
try:
work_queue.put(random.random() * 2, timeout=0.1)
jobs_assigned += 1
except Full:
pass
print("Call it a day and send stop sentinel to everybody")
for i in range(num_workers):
work_queue.put(None)
log_queue.put(None)
for w in workers:
w.join()
print("another worker retired!")
listener.join()
configured worker 0 with logger A.worker.0 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
configured worker 1 with logger A.worker.1 with handlers: [<QueueHandler (NOTSET)>]
self.logger handlers during init: [<QueueHandler (NOTSET)>]
bagged a message from the queue
bagged a message from the queue
(inside run): self.logger name: A.worker.1, handlers: []
(inside run): self.logger name: A.worker.0, handlers: []
worker 0 finished job of length 1.2150712953970373
worker 1 finished job of length 1.2574239731920005
worker 1 finished job of length 0.11736058130132943
Call it a day and send stop sentinel to everybody
worker 0 finished job of length 0.4843796181316009
worker 1 finished job of length 1.048915894468737
bagged a message from the queue
worker 0 finished job of length 1.2749454212499574
worker 0 finished job of length 0.7298640313585205
worker 1 finished job of length 1.6144333153092076
worker 1 finished job of length 1.219077068714904
Worker 1 received stop signal
worker 0 finished job of length 1.561689295025705
Worker 0 received stop signal
another worker retired!
another worker retired!
08-May-24 17:33:15 | A | WARNING | Logger Active!
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A | INFO | worker 0 created
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized
08-May-24 17:33:15 | A | INFO | worker 1 created
08-May-24 17:33:15 | A.worker.0 | INFO | worker 0 initialized
08-May-24 17:33:15 | A.worker.1 | INFO | worker 1 initialized
multiprocessing.set_start_method('fork')
您的代码在主进程内初始化记录器处理程序,而不是新进程。
典当 ...子进程只会继承运行进程对象的 run() 方法所需的资源。特别是,来自父进程的不必要的文件描述符和句柄将不会被继承。
叉子 ...父进程的所有资源都由子进程继承。