我试图用Python生成器重现反应扩展“共享”可观察概念。
假设我有一个API,它给了我一个无限的流,我可以这样使用:
def my_generator():
for elem in the_infinite_stream():
yield elem
我可以多次使用这个发生器:
stream1 = my_generator()
stream2 = my_generator()
并且the_infinite_stream()
将被调用两次(每个发生器一次)。
现在说the_infinite_stream()
是一项昂贵的操作。有没有办法在多个客户端之间“共享”生成器?似乎tee会这样做,但我必须提前知道我想要多少个独立的发电机。
这个想法是在其他语言(Java,Swift)中使用反应式扩展(RxJava,RxSwift)“共享”流,我可以方便地在客户端复制流。我想知道如何在Python中这样做。
注意:我使用的是asyncio
我采用了tee
实现并对其进行了修改,因此您可以从infinite_stream
获得各种数量的生成器:
import collections
def generators_factory(iterable):
it = iter(iterable)
deques = []
already_gone = []
def new_generator():
new_deque = collections.deque()
new_deque.extend(already_gone)
deques.append(new_deque)
def gen(mydeque):
while True:
if not mydeque: # when the local deque is empty
newval = next(it) # fetch a new value and
already_gone.append(newval)
for d in deques: # load it to all the deques
d.append(newval)
yield mydeque.popleft()
return gen(new_deque)
return new_generator
# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1
要缓存一些数量的值,您可以将already_gone = []
更改为already_gone = collections.deque(maxlen=size)
并将size=None
参数添加到generators_factory
。
考虑简单的class attributes。
特定
def infinite_stream():
"""Yield a number from a (semi-)infinite iterator."""
# Alternatively, `yield from itertools.count()`
yield from iter(range(100000000))
# Helper
def get_data(iterable):
"""Print the state of `data` per stream."""
return ", ".join([f"{x.__name__}: {x.data}" for x in iterable])
码
class SharedIterator:
"""Share the state of an iterator with subclasses."""
_gen = infinite_stream()
data = None
@staticmethod
def modify():
"""Advance the shared iterator + assign new data."""
cls = SharedIterator
cls.data = next(cls._gen)
演示
鉴于客户streams
(A
,B
和C
)的元组,
# Streams
class A(SharedIterator): pass
class B(SharedIterator): pass
class C(SharedIterator): pass
streams = A, B, C
让我们修改并打印它们之间共享的一个迭代器的状态:
# Observe changed state in subclasses
A.modify()
print("1st access:", get_data(streams))
B.modify()
print("2nd access:", get_data(streams))
C.modify()
print("3rd access:", get_data(streams))
产量
1st access: A: 0, B: 0, C: 0
2nd access: A: 1, B: 1, C: 1
3rd access: A: 2, B: 2, C: 2
虽然任何流都可以修改迭代器,但class属性在子类之间共享。
也可以看看
您可以反复调用“tee”以根据需要创建多个迭代器。
it = iter([ random.random() for i in range(100)])
base, it_cp = itertools.tee(it)
_, it_cp2 = itertools.tee(base)
_, it_cp3 = itertools.tee(base)
您可以使用单个生成器和“订户生成器”:
subscribed_generators = []
def my_generator():
while true:
elem = yield
do_something(elem) # or yield do_something(elem) depending on your actual use
def publishing_generator():
for elem in the_infinite_stream():
for generator in subscribed_generators:
generator.send(elem)
subscribed_generators.extend([my_generator(), my_generator()])
# Next is just ane example that forces iteration over `the_infinite_stream`
for elem in publishing_generator():
pass
您也可以使用方法创建一个类,而不是生成器函数:__next__
,__iter__
,send
,throw
。这样你就可以修改MyGenerator.__init__
方法,自动将它的新实例添加到subscribed_generators
。
这有点类似于基于事件的方法,具有“哑实现”:
for elem in the_infinite_stream
类似于发射事件for generator ...: generator.send
类似于向每个订阅者发送事件。因此,实现“更复杂但结构化的解决方案”的一种方法是使用基于事件的方法:
the_infinite_stream
为每个元素发出事件,并且您的my_generator
实例应该订阅这些事件。也可以使用其他方法,最佳选择取决于:您的任务细节,如何在asyncio中使用事件循环。例如:
the_infinite_stream
(或它的包装器)实现为具有“游标”的类(用于跟踪不同订户的流中当前位置的对象);然后每个my_generator
注册新光标并使用它来获取无限流中的下一个项目。在这种方法中,事件循环不会自动重新访问my_generator
实例,如果这些实例“不相等”,可能需要这些实例(例如,具有一些“优先级平衡”)my_generator
的所有实例(如前所述)。在这种方法中,my_generator
的每个实例都由事件循环自动重新访问。这种方法很可能是线程安全的。asyncio.Event
。类似于使用中间发电机。不是线程安全的
aiopubsub。
使用Observer pattern的东西the_infinite_generator
(或它的包装)成为“高速缓存”最新事件的“Singleton”。其他答案中描述了一些方法。可以使用另一种“缓存”解决方案:
为the_infinite_generator
的每个实例发出一次相同的元素(使用跟踪实例的自定义__new__
方法的类,或者使用具有返回“移位”迭代器而不是the_infinite_loop
的方法的类的相同实例),直到有人在the_infinite_generator
实例上调用特殊方法(或在课堂上):infinite_gen.next_cycle
。在这种情况下应该总是有一些“最后的最终生成器/处理器”,在每个事件循环的周期结束时将执行the_infinite_generator().next_cycle()
与之前类似,但同一事件允许在同一个my_generator
实例中多次触发(因此他们应该注意这种情况)。在这种方法中,the_infinite_generator().next_cycle()
可以被称为“定期”与loop.call_later or loop.cal_at。如果“订户”应该能够处理/分析:延迟,速率限制,事件之间的超时等,则可能需要这种方法。the_infinite_loop
的生成器的期望行为是什么,很难提出具体的内容如果我理解你对“共享”流的描述,你真的需要“一个”the_infinite_stream
生成器和它的“处理程序”。尝试执行此操作的示例:
class StreamHandler:
def __init__(self):
self.__real_stream = the_infinite_stream()
self.__sub_streams = []
def get_stream(self):
sub_stream = [] # or better use some Queue/deque object. Using list just to show base principle
self.__sub_streams.append(sub_stream)
while True:
while sub_stream:
yield sub_stream.pop(0)
next(self)
def __next__(self):
next_item = next(self.__real_stream)
for sub_stream in self.__sub_steams:
sub_stream.append(next_item)
some_global_variable = StreamHandler()
# Or you can change StreamHandler.__new__ to make it singleton, or you can create an instance at the point of creation of event-loop
def my_generator():
for elem in some_global_variable.get_stream():
yield elem
但是如果所有的my_generator
对象都在无限流的同一点初始化,并且在循环内“同样”迭代,那么这种方法将为每个“sub_stream”(用作队列)引入“不必要的”内存开销。不必要的:因为这些队列总是相同的(但可以优化:如果存在一些现有的“空”子流,则可以将其重新用于新的子流,并对“pop
-logic”进行一些更改)。还可以讨论许多其他实现和细微差别