上下文管理器和守护线程

问题描述 投票:0回答:2

我正在从上下文管理器启动一个守护线程,该线程应该每秒发送一次心跳,但由于它在线程中运行,因此如果发生异常,它不会终止上下文管理器。当心跳停止时,如何在上下文管理器中引发异常?

from contextlib import contextmanager
from threading import Thread, Event
from time import sleep


@contextmanager
def plc():
    stop_event = Event()

    try:
        # Send heartbeat every second
        hb_t = Thread(target=heartbeat_task,
                      args=(stop_event,),
                      daemon=True)
        hb_t.start()

        yield
    except Exception:
        raise
    finally:
        stop_event.set()
        hb_t.join()
        print("Heartbeat stopped")


def heartbeat_task(stop_event):

    value = False
    
    while not stop_event.is_set():

        value = not value

        print("Heartbeat: " + str(value))

        sleep(1)


def main():

    with plc():

        while True:

            print("Program running")

            sleep(5)

if __name__ == '__main__':
    main()

我很难找到这方面的例子。

感谢您的帮助!

python multithreading contextmanager
2个回答
0
投票

这毫无意义,但可能会帮助您了解如何解决这个问题。

此技术的一个优点是可以从引用 PLC 类实例的任何地方调用 stop() 函数。因此,您有两种方法来停止底层(心跳)线程。

在这里使用上下文管理器确实没有任何价值。我之所以将其包括在内,是因为这是一个明显的要求。

from threading import Thread, Event
from time import sleep
from random import randint

class PLC(Thread):
    def __init__(self):
        self._event = Event()
        super().__init__()
    def __enter__(self):
        return self  
    def __exit__(self, *_):
        pass
    def run(self):
        while not self._event.is_set():
            sleep(1)
            if randint(1, 10) == 1: # simulate exception
                # this will break the while loop and the thread will terminate
                self.stop()
    def stop(self):
        self._event.set()

if __name__ == "__main__":
    with PLC() as plc:
        plc.start()
        plc.join()

0
投票

如果你是一个多线程池(我们只需要池中的一个线程),那么主线程捕获提交到池中的任务抛出的异常就变得很简单:当

multiprocessing.pool.ThreadPool.apply_async
被称为
multiprocessing.pool.AsyncResult
实例时返回代表未来的完成。当在此实例上调用方法
get
时,您可以从工作函数 (
heartbeat_task
) 获取返回值,或者重新引发工作函数抛出的任何异常。但我们也可以使用方法
wait
来等待提交任务的完成或经过的时间。然后我们可以用方法
ready
测试等待5秒后提交的任务是否真正完成(由于异常或返回)。如果任务仍在运行,那么我们可以告诉它停止。在此演示中,我强制任务在大约 3 秒后引发异常:

from contextlib import contextmanager
from threading import Event
from multiprocessing.pool import ThreadPool
from time import sleep


@contextmanager
def plc():
    stop_event = Event()
    pool = ThreadPool(1)

    try:
        # Send heartbeat every second
        async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
        yield stop_event, async_result
    except Exception as e:
        print("Got exception:", e)
    finally:
        pool.close()
        pool.join()
        print("Heartbeat stopped")


def heartbeat_task(stop_event):
    value = False

    n = 0
    while not stop_event.is_set():
        value = not value
        print("Heartbeat: " + str(value))
        sleep(1)
        n += 1
        if n == 3:
            raise Exception('Oops!')


def main():
    with plc() as tpl:
        stop_event, async_result = tpl  # Unpack
        # Wait up to 5 seconds for task to complete:
        async_result.wait(5)
        if not async_result.ready():
            # Task is still running, so tell it to stop:
            stop_event.set()
        # Wait for task to complete. If the task raised an exception
        # it will be rethrown and caught in our context manager:
        async_result.get()

if __name__ == '__main__':
    main()

打印:

Heartbeat: True
Heartbeat: False
Heartbeat: True
Got exception: Oops!
Heartbeat stopped
© www.soinside.com 2019 - 2024. All rights reserved.