当实现一个旨在定期从流中读取的线程时,我无法设法使线程正确停止。只有当我使用的回调函数被实现为代理的方法时才会出现这种情况 (
Worker
)。请参阅此示例(python v3.10.11):
import threading
from time import sleep
import weakref
class Consumer(threading.Thread):
"""This class periodically reads from a stream."""
def __init__(self, stream_key, callback):
super().__init__()
self._stream_key: str = stream_key
self._handlers = {callback}
self._running = True
def run(self):
"""Poll the event stream and call each handler with each event item returned."""
counter = 0
while self._running:
for number, handler in enumerate(self._handlers):
handler(number, counter)
print("reading from stream: ", self._stream_key)
counter += 1
sleep(2)
def stop(self):
"""Stop polling the event stream."""
self._running = False
self.join()
def start(self) -> None:
self._running = True
return super().start()
def add_handler(self, callback):
self._handlers.add(callback)
def remove_handler(self, callback):
self._handlers.remove(callback)
class EventHandler:
def __init__(self):
self.consumers = weakref.WeakValueDictionary()
def subscribe(self, stream_key: str, callback):
if stream_key in self.consumers:
self.consumers[stream_key].add_handler(callback)
else:
consumer = Consumer(stream_key=stream_key, callback=callback)
self.consumers[stream_key] = consumer
self.consumers[stream_key].start()
def __del__(self):
for consumer in self.consumers.values():
consumer.stop()
class Worker:
def __init__(self) -> None:
self._eventhandler = EventHandler()
self.registered = False
self._subscriptions = {("test-stream-key", self.handlerfunc)}
def register(self):
self._start_listeners()
self.registered = True
def _start_listeners(self):
for subscription in self._subscriptions:
self._eventhandler.subscribe(*subscription)
def handlerfunc(self, number, counter):
print(f"handler {number} doing things, counting: {counter}")
worker = Worker()
worker.register()
del worker
它不断产生像
这样的输出reading from stream: test-stream-key
handler 0 doing things, counting: 1
reading from stream: test-stream-key
handler 0 doing things, counting: 2
...
在
del
命令之后,我希望垃圾收集发挥它的魔力,从而停止代理(包括 EventHandler
也有一个 __del__
方法)。
有趣的是,如果我没有将
handlerfunc
定义为 Worker
的方法,但在全局范围内,这就可以正常工作:
import threading
from time import sleep
import weakref
class Consumer(threading.Thread):
"""This class periodically reads from a stream."""
def __init__(self, stream_key, callback):
super().__init__()
self._stream_key: str = stream_key
self._handlers = {callback}
self._running = True
def run(self):
"""Poll the event stream and call each handler with each event item returned."""
counter = 0
while self._running:
for number, handler in enumerate(self._handlers):
handler(number, counter)
print("reading from stream: ", self._stream_key)
counter += 1
sleep(2)
def stop(self):
"""Stop polling the event stream."""
self._running = False
self.join()
def start(self) -> None:
self._running = True
return super().start()
def add_handler(self, callback):
self._handlers.add(callback)
def remove_handler(self, callback):
self._handlers.remove(callback)
class EventHandler:
def __init__(self):
self.consumers = weakref.WeakValueDictionary()
def subscribe(self, stream_key: str, callback):
if stream_key in self.consumers:
self.consumers[stream_key].add_handler(callback)
else:
consumer = Consumer(stream_key=stream_key, callback=callback)
self.consumers[stream_key] = consumer
self.consumers[stream_key].start()
def __del__(self):
for consumer in self.consumers.values():
consumer.stop()
class Worker:
def __init__(self) -> None:
self._eventhandler = EventHandler()
self.registered = False
self._subscriptions = {("test-stream-key", handlerfunc)}
def register(self):
self._start_listeners()
self.registered = True
def _start_listeners(self):
for subscription in self._subscriptions:
self._eventhandler.subscribe(*subscription)
def handlerfunc(number, counter):
print(f"handler {number} doing things, counting: {counter}")
worker = Worker()
worker.register()
del worker
在那种情况下,它会在一条消息后或多或少立即停止。这也是我对类作用域方法的期望。
这里发生了什么?使用
weakref.WeakValueDictionary()
是否正确? (显然不是)但是至少使用weakref
的想法是正确的吗?
你的类是强耦合的——它们在它们的方法中使用了另一个类的方法。你需要打破这些联系。否则,当
del worker
引用被删除时,该对象不会被垃圾回收,因为它仍然被子线程引用。你可以这样停止执行:
worker = Worker()
worker._eventhandler.register()
worker._eventhandler.__del__()