我正在努力使用 asyncio 和异步迭代器在 Python 中实现观察者模式。目标是创建一个“更改流”,任务可以在其中添加更改,其他任务可以作为异步迭代器订阅这些更改。我正在尝试创建一个类似于 Dart 中的广播流的界面。
这是我迄今为止所拥有的简化版本:
from asyncio import Condition
class ChangeStream:
def __init__(self):
self._condition = Condition()
self._change = None
async def add_change(self, change):
async with self._condition:
self._change = change
self._condition.notify_all()
async def __aiter__(self):
async with self._condition:
while True:
await self._condition.wait()
yield self._change
这是实现观察者模式的最佳方法吗?是否有更有效或更容易推理的方法来实现这一点?
编辑:这里的目标是观察者可以看到订阅后的所有更改,但看不到订阅前的任何更改。
从您的描述和评论来看,您似乎确实在寻找发布-订阅模式而不是观察者模式。它们很相似,但观察者通常被主题所知并直接获取更新,在发布-订阅中,发布者发布到跟踪对数据感兴趣的消费者的总线(如您的 Stream)。
这是一个基于
Queue
的实现的示例,以及一些额外的代码来展示它是如何工作的 - 当然,您可以以不同的方式使用该类,TaskGroup
只是完成某些任务的一种方式:
from asyncio import Queue, TaskGroup, sleep, run
from random import random
class ChangeStream:
def __init__(self):
self._subscribers = []
async def add_change(self, change):
for queue in self._subscribers:
await queue.put(change)
async def __aiter__(self):
queue = Queue()
self._subscribers.append(queue)
try:
while True:
yield (value := await queue.get())
if value is None:
raise GeneratorExit
except GeneratorExit:
self._subscribers.remove(queue)
async def produce_changes(stream: ChangeStream):
for i in range(10):
await sleep(random() * 3)
print(f"Add value: {i}")
await stream.add_change(i)
print("Done adding values, writing None to stream.")
await stream.add_change(None)
async def consume_changes(stream: ChangeStream, name: str, start_delay: int):
await sleep(start_delay)
print(f"{name} starting...")
async for value in stream:
print(f"{name} received: {value}")
if value is None:
break
async def main():
# create a stream, pass it to a producer task to publish to
stream = ChangeStream()
async with TaskGroup() as tg:
tg.create_task(produce_changes(stream))
tg.create_task(consume_changes(stream, 'consumer 1', 0))
tg.create_task(consume_changes(stream, 'consumer 2', 5)) # start after 5 seconds
if __name__ == "__main__":
run(main())
输出(示例,由于随机性):
consumer 1 starting...
Add value: 0
consumer 1 received: 0
Add value: 1
consumer 1 received: 1
Add value: 2
consumer 1 received: 2
consumer 2 starting...
Add value: 3
consumer 1 received: 3
consumer 2 received: 3
Add value: 4
consumer 1 received: 4
consumer 2 received: 4
Add value: 5
consumer 1 received: 5
consumer 2 received: 5
Add value: 6
consumer 1 received: 6
consumer 2 received: 6
Add value: 7
consumer 1 received: 7
consumer 2 received: 7
Add value: 8
consumer 1 received: 8
consumer 2 received: 8
Add value: 9
Done adding values, writing None to stream.
consumer 1 received: 9
consumer 1 received: None
consumer 2 received: 9
consumer 2 received: None