运行时错误:内部可重入调用<_io.BufferedWriter name='<stdout>'>

问题描述 投票:0回答:1

我正在编写一个程序,该程序启动一个线程来生成“工作”并每 N 秒将其添加到队列中。然后,我有一个线程池来处理队列中的项目。

下面的程序运行得很好,直到我注释掉/删除第 #97 行(在主函数中

time.sleep(0.5)
)。一旦我这样做,它会生成一个 RuntimeError ,它试图正常停止程序(通过向主进程发送 SIGINT 或 SIGTERM )。它甚至可以在 0.1 秒这样极短的睡眠时间下正常工作,但完全没有睡眠时间就会出现问题。

我尝试研究“重入性”,但不幸的是它有点超出了我的理解范围。

谁能帮助我理解这一点?

代码:

import random
import signal
import threading
import time
from concurrent.futures import Future, ThreadPoolExecutor
from datetime import datetime
from queue import Empty, Queue, SimpleQueue
from typing import Any


class UniqueQueue:
    """
    A thread safe queue which can only ever contain unique items.
    """

    def __init__(self) -> None:
        self._q = Queue()
        self._items = []
        self._l = threading.Lock()

    def get(self, block: bool = False, timeout: float | None = None) -> Any:
        with self._l:
            try:
                item = self._q.get(block=block, timeout=timeout)
            except Empty:
                raise
            else:
                self._items.pop(0)
                return item

    def put(self, item: Any, block: bool = False, timeout: float | None = None) -> None:
        with self._l:
            if item in self._items:
                return None
            self._items.append(item)
            self._q.put(item, block=block, timeout=timeout)

    def size(self) -> int:
        return self._q.qsize()

    def empty(self) -> bool:
        return self._q.empty()


def stop_app(sig_num, sig_frame) -> None:
    # global stop_app_event
    print("Signal received to stop the app")
    stop_app_event.set()


def work_generator(q: UniqueQueue) -> None:
    last_execution = time.time()
    is_first_execution = True
    while not stop_app_event.is_set():
        elapsed_seconds = int(time.time() - last_execution)
        if elapsed_seconds <= 10 and not is_first_execution:
            time.sleep(0.5)
            continue
        last_execution = time.time()
        is_first_execution = False
        print("Generating work...")
        for _ in range(100):
            q.put({"n": random.randint(0, 500)})


def print_work(w) -> None:
    print(f"{datetime.now()}: {w}")


def main():
    # Create a work queue
    work_queue = UniqueQueue()

    # Create a thread to generate the work and add to the queue
    t = threading.Thread(target=work_generator, args=(work_queue,))
    t.start()

    # Create a thread pool, get work from the queue, and submit to the pool for processing
    pool = ThreadPoolExecutor(max_workers=20)
    futures: list[Future] = []
    while True:
        print("Processing work...")
        if stop_app_event.is_set():
            print("stop_app_event is set:", stop_app_event.is_set())
            for future in futures:
                future.cancel()
            break
        print("Queue Size:", work_queue.size())
        try:
            while not work_queue.empty():
                work = work_queue.get()
                future = pool.submit(print_work, work)
                futures.append(future)
        except Empty:
            pass
        time.sleep(0.5)

    print("Stopping the work generator thread...")
    t.join(timeout=10)
    print("Work generator stopped")
    print("Stopping the thread pool...")
    pool.shutdown(wait=True)
    print("Thread pool stopped")


if __name__ == "__main__":
    stop_app_event = threading.Event()
    signal.signal(signalnum=signal.SIGINT, handler=stop_app)
    signal.signal(signalnum=signal.SIGTERM, handler=stop_app)
    main()
python python-3.x python-multithreading
1个回答
2
投票

这是因为你在信号处理程序中调用了

print()
stop_app()

C 中信号处理程序在后台线程中执行,但在 Python 中它在主线程中执行。 (参见参考。)在您的情况下,在执行

print()
调用时,另一个
print()
被调用,术语“可重入”在这里非常适合。并且当前的IO堆栈禁止重入调用。(如果您有兴趣,请参阅实现。)

您可以使用

os.write()
sys.stdout
来解决此问题,如下所示。

import sys
import os
...
def stop_app(sig_num, sig_frame):
    os.write(sys.stdout.fileno(), b"Signal received to stop the app\n")
    stop_app_event.set()
© www.soinside.com 2019 - 2024. All rights reserved.