我正在从上下文管理器启动一个守护线程,该线程应该每秒发送一次心跳,但由于它在线程中运行,因此如果发生异常,它不会终止上下文管理器。当心跳停止时,如何在上下文管理器中引发异常?
from contextlib import contextmanager
from threading import Thread, Event
from time import sleep
@contextmanager
def plc():
stop_event = Event()
try:
# Send heartbeat every second
hb_t = Thread(target=heartbeat_task,
args=(stop_event,),
daemon=True)
hb_t.start()
yield
except Exception:
raise
finally:
stop_event.set()
hb_t.join()
print("Heartbeat stopped")
def heartbeat_task(stop_event):
value = False
while not stop_event.is_set():
value = not value
print("Heartbeat: " + str(value))
sleep(1)
def main():
with plc():
while True:
print("Program running")
sleep(5)
if __name__ == '__main__':
main()
我很难找到这方面的例子。
感谢您的帮助!
这毫无意义,但可能会帮助您了解如何解决这个问题。
此技术的一个优点是可以从引用 PLC 类实例的任何地方调用 stop() 函数。因此,您有两种方法来停止底层(心跳)线程。
在这里使用上下文管理器确实没有任何价值。我之所以将其包括在内,是因为这是一个明显的要求。
from threading import Thread, Event
from time import sleep
from random import randint
class PLC(Thread):
def __init__(self):
self._event = Event()
super().__init__()
def __enter__(self):
return self
def __exit__(self, *_):
pass
def run(self):
while not self._event.is_set():
sleep(1)
if randint(1, 10) == 1: # simulate exception
# this will break the while loop and the thread will terminate
self.stop()
def stop(self):
self._event.set()
if __name__ == "__main__":
with PLC() as plc:
plc.start()
plc.join()
如果你是一个多线程池(我们只需要池中的一个线程),那么主线程捕获提交到池中的任务抛出的异常就变得很简单:当
multiprocessing.pool.ThreadPool.apply_async
被称为 multiprocessing.pool.AsyncResult
实例时返回代表未来的完成。当在此实例上调用方法 get
时,您可以从工作函数 (heartbeat_task
) 获取返回值,或者重新引发工作函数抛出的任何异常。但我们也可以使用方法 wait
来等待提交任务的完成或经过的时间。然后我们可以用方法ready
测试等待5秒后提交的任务是否真正完成(由于异常或返回)。如果任务仍在运行,那么我们可以告诉它停止。在此演示中,我强制任务在大约 3 秒后引发异常:
from contextlib import contextmanager
from threading import Event
from multiprocessing.pool import ThreadPool
from time import sleep
@contextmanager
def plc():
stop_event = Event()
pool = ThreadPool(1)
try:
# Send heartbeat every second
async_result = pool.apply_async(heartbeat_task, args=(stop_event,))
yield stop_event, async_result
except Exception as e:
print("Got exception:", e)
finally:
pool.close()
pool.join()
print("Heartbeat stopped")
def heartbeat_task(stop_event):
value = False
n = 0
while not stop_event.is_set():
value = not value
print("Heartbeat: " + str(value))
sleep(1)
n += 1
if n == 3:
raise Exception('Oops!')
def main():
with plc() as tpl:
stop_event, async_result = tpl # Unpack
# Wait up to 5 seconds for task to complete:
async_result.wait(5)
if not async_result.ready():
# Task is still running, so tell it to stop:
stop_event.set()
# Wait for task to complete. If the task raised an exception
# it will be rethrown and caught in our context manager:
async_result.get()
if __name__ == '__main__':
main()
打印:
Heartbeat: True
Heartbeat: False
Heartbeat: True
Got exception: Oops!
Heartbeat stopped