python 线程中的垃圾收集

问题描述 投票:0回答:1

当实现一个旨在定期从流中读取的线程时,我无法设法使线程正确停止。只有当我使用的回调函数被实现为代理的方法时才会出现这种情况 (

Worker
)。请参阅此示例(python v3.10.11):

import threading
from time import sleep
import weakref


class Consumer(threading.Thread):
    """This class periodically reads from a stream."""

    def __init__(self, stream_key, callback):
        super().__init__()

        self._stream_key: str = stream_key
        self._handlers = {callback}

        self._running = True

    def run(self):
        """Poll the event stream and call each handler with each event item returned."""
        counter = 0
        while self._running:
            for number, handler in enumerate(self._handlers):
                handler(number, counter)
                print("reading from stream: ", self._stream_key)
                counter += 1
                sleep(2)

    def stop(self):
        """Stop polling the event stream."""
        self._running = False
        self.join()

    def start(self) -> None:
        self._running = True
        return super().start()

    def add_handler(self, callback):
        self._handlers.add(callback)

    def remove_handler(self, callback):
        self._handlers.remove(callback)


class EventHandler:
    def __init__(self):
        self.consumers = weakref.WeakValueDictionary()

    def subscribe(self, stream_key: str, callback):
        if stream_key in self.consumers:
            self.consumers[stream_key].add_handler(callback)
        else:
            consumer = Consumer(stream_key=stream_key, callback=callback)
            self.consumers[stream_key] = consumer
            self.consumers[stream_key].start()

    def __del__(self):
        for consumer in self.consumers.values():
            consumer.stop()


class Worker:
    def __init__(self) -> None:
        self._eventhandler = EventHandler()
        self.registered = False
        self._subscriptions = {("test-stream-key", self.handlerfunc)}

    def register(self):
        self._start_listeners()
        self.registered = True

    def _start_listeners(self):
        for subscription in self._subscriptions:
            self._eventhandler.subscribe(*subscription)

    def handlerfunc(self, number, counter):
        print(f"handler {number} doing things, counting: {counter}")


worker = Worker()


worker.register()


del worker

它不断产生像

这样的输出
reading from stream:  test-stream-key
handler 0 doing things, counting: 1
reading from stream:  test-stream-key
handler 0 doing things, counting: 2
...

del
命令之后,我希望垃圾收集发挥它的魔力,从而停止代理(包括
EventHandler
也有一个
__del__
方法)。

有趣的是,如果我没有将

handlerfunc
定义为
Worker
的方法,但在全局范围内,这就可以正常工作:

import threading
from time import sleep
import weakref


class Consumer(threading.Thread):
    """This class periodically reads from a stream."""

    def __init__(self, stream_key, callback):
        super().__init__()

        self._stream_key: str = stream_key
        self._handlers = {callback}

        self._running = True

    def run(self):
        """Poll the event stream and call each handler with each event item returned."""
        counter = 0
        while self._running:
            for number, handler in enumerate(self._handlers):
                handler(number, counter)
                print("reading from stream: ", self._stream_key)
                counter += 1
                sleep(2)

    def stop(self):
        """Stop polling the event stream."""
        self._running = False
        self.join()

    def start(self) -> None:
        self._running = True
        return super().start()

    def add_handler(self, callback):
        self._handlers.add(callback)

    def remove_handler(self, callback):
        self._handlers.remove(callback)


class EventHandler:
    def __init__(self):
        self.consumers = weakref.WeakValueDictionary()

    def subscribe(self, stream_key: str, callback):
        if stream_key in self.consumers:
            self.consumers[stream_key].add_handler(callback)
        else:
            consumer = Consumer(stream_key=stream_key, callback=callback)
            self.consumers[stream_key] = consumer
            self.consumers[stream_key].start()

    def __del__(self):
        for consumer in self.consumers.values():
            consumer.stop()


class Worker:
    def __init__(self) -> None:
        self._eventhandler = EventHandler()
        self.registered = False
        self._subscriptions = {("test-stream-key", handlerfunc)}

    def register(self):
        self._start_listeners()
        self.registered = True

    def _start_listeners(self):
        for subscription in self._subscriptions:
            self._eventhandler.subscribe(*subscription)


def handlerfunc(number, counter):
    print(f"handler {number} doing things, counting: {counter}")


worker = Worker()


worker.register()


del worker

在那种情况下,它会在一条消息后或多或少立即停止。这也是我对类作用域方法的期望。

这里发生了什么?使用

weakref.WeakValueDictionary()
是否正确? (显然不是)但是至少使用
weakref
的想法是正确的吗?

python multithreading python-multithreading weak-references
1个回答
-1
投票

你的类是强耦合的——它们在它们的方法中使用了另一个类的方法。你需要打破这些联系。否则,当

del worker
引用被删除时,该对象不会被垃圾回收,因为它仍然被子线程引用。你可以这样停止执行:

worker = Worker()

worker._eventhandler.register()

worker._eventhandler.__del__()

© www.soinside.com 2019 - 2024. All rights reserved.