如何在Python中使用异步迭代器实现观察者模式?

问题描述 投票:0回答:1

我正在努力使用 asyncio 和异步迭代器在 Python 中实现观察者模式。目标是创建一个“更改流”,任务可以在其中添加更改,其他任务可以作为异步迭代器订阅这些更改。我正在尝试创建一个类似于 Dart 中的广播流的界面。

这是我迄今为止所拥有的简化版本:

from asyncio import Condition

class ChangeStream:
    def __init__(self):
        self._condition = Condition()
        self._change = None

    async def add_change(self, change):
        async with self._condition:
            self._change = change
            self._condition.notify_all()
    
    async def __aiter__(self):
        async with self._condition:
            while True:
                await self._condition.wait()
                yield self._change

这是实现观察者模式的最佳方法吗?是否有更有效或更容易推理的方法来实现这一点?

编辑:这里的目标是观察者可以看到订阅后的所有更改,但看不到订阅前的任何更改。

python asynchronous async-await python-asyncio
1个回答
0
投票

从您的描述和评论来看,您似乎确实在寻找发布-订阅模式而不是观察者模式。它们很相似,但观察者通常被主题所知并直接获取更新,在发布-订阅中,发布者发布到跟踪对数据感兴趣的消费者的总线(如您的 Stream)。

这是一个基于

Queue
的实现的示例,以及一些额外的代码来展示它是如何工作的 - 当然,您可以以不同的方式使用该类,
TaskGroup
只是完成某些任务的一种方式:

from asyncio import Queue, TaskGroup, sleep, run
from random import random


class ChangeStream:
    def __init__(self):
        self._subscribers = []

    async def add_change(self, change):
        for queue in self._subscribers:
            await queue.put(change)

    async def __aiter__(self):
        queue = Queue()
        self._subscribers.append(queue)
        try:
            while True:
                yield (value := await queue.get())
                if value is None:
                    raise GeneratorExit
        except GeneratorExit:
            self._subscribers.remove(queue)


async def produce_changes(stream: ChangeStream):
    for i in range(10):
        await sleep(random() * 3)
        print(f"Add value: {i}")
        await stream.add_change(i)
    print("Done adding values, writing None to stream.")
    await stream.add_change(None)


async def consume_changes(stream: ChangeStream, name: str, start_delay: int):
    await sleep(start_delay)
    print(f"{name} starting...")
    async for value in stream:
        print(f"{name} received: {value}")
        if value is None:
            break


async def main():
    # create a stream, pass it to a producer task to publish to
    stream = ChangeStream()

    async with TaskGroup() as tg:
        tg.create_task(produce_changes(stream))
        tg.create_task(consume_changes(stream, 'consumer 1', 0))
        tg.create_task(consume_changes(stream, 'consumer 2', 5))  # start after 5 seconds


if __name__ == "__main__":
    run(main())

输出(示例,由于随机性):

consumer 1 starting...
Add value: 0
consumer 1 received: 0
Add value: 1
consumer 1 received: 1
Add value: 2
consumer 1 received: 2
consumer 2 starting...
Add value: 3
consumer 1 received: 3
consumer 2 received: 3
Add value: 4
consumer 1 received: 4
consumer 2 received: 4
Add value: 5
consumer 1 received: 5
consumer 2 received: 5
Add value: 6
consumer 1 received: 6
consumer 2 received: 6
Add value: 7
consumer 1 received: 7
consumer 2 received: 7
Add value: 8
consumer 1 received: 8
consumer 2 received: 8
Add value: 9
Done adding values, writing None to stream.
consumer 1 received: 9
consumer 1 received: None
consumer 2 received: 9
consumer 2 received: None
© www.soinside.com 2019 - 2024. All rights reserved.