共享python生成器

问题描述 投票:17回答:4

我试图用Python生成器重现反应扩展“共享”可观察概念。

假设我有一个API,它给了我一个无限的流,我可以这样使用:

def my_generator():
    for elem in the_infinite_stream():
        yield elem

我可以多次使用这个发生器:

stream1 = my_generator()
stream2 = my_generator()

并且the_infinite_stream()将被调用两次(每个发生器一次)。

现在说the_infinite_stream()是一项昂贵的操作。有没有办法在多个客户端之间“共享”生成器?似乎tee会这样做,但我必须提前知道我想要多少个独立的发电机。

这个想法是在其他语言(Java,Swift)中使用反应式扩展(RxJava,RxSwift)“共享”流,我可以方便地在客户端复制流。我想知道如何在Python中这样做。

注意:我使用的是asyncio

python generator python-asyncio
4个回答
9
投票

我采用了tee实现并对其进行了修改,因此您可以从infinite_stream获得各种数量的生成器:

import collections

def generators_factory(iterable):
    it = iter(iterable)
    deques = []
    already_gone = []

    def new_generator():
        new_deque = collections.deque()
        new_deque.extend(already_gone)
        deques.append(new_deque)

        def gen(mydeque):
            while True:
                if not mydeque:             # when the local deque is empty
                    newval = next(it)       # fetch a new value and
                    already_gone.append(newval)
                    for d in deques:        # load it to all the deques
                        d.append(newval)
                yield mydeque.popleft()

        return gen(new_deque)

    return new_generator

# test it:
infinite_stream = [1, 2, 3, 4, 5]
factory = generators_factory(infinite_stream)
gen1 = factory()
gen2 = factory()
print(next(gen1)) # 1
print(next(gen2)) # 1 even after it was produced by gen1
print(list(gen1)) # [2, 3, 4, 5] # the rest after 1

要缓存一些数量的值,您可以将already_gone = []更改为already_gone = collections.deque(maxlen=size)并将size=None参数添加到generators_factory


4
投票

考虑简单的class attributes

特定

def infinite_stream():
    """Yield a number from a (semi-)infinite iterator."""
    # Alternatively, `yield from itertools.count()`
    yield from iter(range(100000000))


# Helper
def get_data(iterable):
    """Print the state of `data` per stream."""
    return ", ".join([f"{x.__name__}: {x.data}" for x in iterable])

class SharedIterator:
    """Share the state of an iterator with subclasses."""
    _gen = infinite_stream()
    data = None

    @staticmethod
    def modify():
        """Advance the shared iterator + assign new data."""
        cls = SharedIterator
        cls.data = next(cls._gen)

演示

鉴于客户streamsABC)的元组,

# Streams
class A(SharedIterator): pass
class B(SharedIterator): pass
class C(SharedIterator): pass


streams = A, B, C

让我们修改并打印它们之间共享的一个迭代器的状态:

# Observe changed state in subclasses    
A.modify()
print("1st access:", get_data(streams))
B.modify()
print("2nd access:", get_data(streams))
C.modify()
print("3rd access:", get_data(streams))

产量

1st access: A: 0, B: 0, C: 0
2nd access: A: 1, B: 1, C: 1
3rd access: A: 2, B: 2, C: 2

虽然任何流都可以修改迭代器,但class属性在子类之间共享。

也可以看看

  • Docs上的asyncio.Queue - 共享容器的异步替代品
  • 观察者模式+ Post上的asyncio

4
投票

您可以反复调用“tee”以根据需要创建多个迭代器。

it  = iter([ random.random() for i in range(100)])
base, it_cp = itertools.tee(it)
_, it_cp2 = itertools.tee(base)
_, it_cp3 = itertools.tee(base)

样本:http://tpcg.io/ZGc6l5


4
投票

您可以使用单个生成器和“订户生成器”:

subscribed_generators = []


def my_generator():
    while true:
        elem = yield
        do_something(elem) # or yield do_something(elem) depending on your actual use

def publishing_generator():
    for elem in the_infinite_stream():
        for generator in subscribed_generators:
            generator.send(elem)

subscribed_generators.extend([my_generator(), my_generator()])

# Next is just ane example that forces iteration over `the_infinite_stream`
for elem in publishing_generator():
    pass

您也可以使用方法创建一个类,而不是生成器函数:__next____iter__sendthrow。这样你就可以修改MyGenerator.__init__方法,自动将它的新实例添加到subscribed_generators

这有点类似于基于事件的方法,具有“哑实现”:

  • for elem in the_infinite_stream类似于发射事件
  • for generator ...: generator.send类似于向每个订阅者发送事件。

因此,实现“更复杂但结构化的解决方案”的一种方法是使用基于事件的方法:

  • 例如,您可以使用asyncio.Event
  • 或者像aiopubsub这样的第三方解决方案
  • 对于任何这些方法,您应该从the_infinite_stream为每个元素发出事件,并且您的my_generator实例应该订阅这些事件。

也可以使用其他方法,最佳选择取决于:您的任务细节,如何在asyncio中使用事件循环。例如:

  • 您可以将the_infinite_stream(或它的包装器)实现为具有“游标”的类(用于跟踪不同订户的流中当前位置的对象);然后每个my_generator注册新光标并使用它来获取无限流中的下一个项目。在这种方法中,事件循环不会自动重新访问my_generator实例,如果这些实例“不相等”,可能需要这些实例(例如,具有一些“优先级平衡”)
  • 中间生成器调用my_generator的所有实例(如前所述)。在这种方法中,my_generator的每个实例都由事件循环自动重新访问。这种方法很可能是线程安全的。
  • 基于事件的方法: 使用asyncio.Event。类似于使用中间发电机。不是线程安全的 aiopubsub。 使用Observer pattern的东西
  • 使the_infinite_generator(或它的包装)成为“高速缓存”最新事件的“Singleton”。其他答案中描述了一些方法。可以使用另一种“缓存”解决方案: 为the_infinite_generator的每个实例发出一次相同的元素(使用跟踪实例的自定义__new__方法的类,或者使用具有返回“移位”迭代器而不是the_infinite_loop的方法的类的相同实例),直到有人在the_infinite_generator实例上调用特殊方法(或在课堂上):infinite_gen.next_cycle。在这种情况下应该总是有一些“最后的最终生成器/处理器”,在每个事件循环的周期结束时将执行the_infinite_generator().next_cycle() 与之前类似,但同一事件允许在同一个my_generator实例中多次触发(因此他们应该注意这种情况)。在这种方法中,the_infinite_generator().next_cycle()可以被称为“定期”与loop.call_later or loop.cal_at。如果“订户”应该能够处理/分析:延迟,速率限制,事件之间的超时等,则可能需要这种方法。
  • 许多其他解决方案都是可能如果不查看当前的实现并且不知道使用the_infinite_loop的生成器的期望行为是什么,很难提出具体的内容

如果我理解你对“共享”流的描述,你真的需要“一个”the_infinite_stream生成器和它的“处理程序”。尝试执行此操作的示例:


class StreamHandler:
    def __init__(self):
        self.__real_stream = the_infinite_stream()
        self.__sub_streams = []

    def get_stream(self):
        sub_stream = []  # or better use some Queue/deque object. Using list just to show base principle
        self.__sub_streams.append(sub_stream)
        while True:
            while sub_stream:
                yield sub_stream.pop(0)
            next(self)

    def __next__(self):
        next_item = next(self.__real_stream)
        for sub_stream in self.__sub_steams:
            sub_stream.append(next_item)

some_global_variable = StreamHandler()
# Or you can change StreamHandler.__new__ to make it singleton, or you can create an instance at the point of creation of event-loop

def my_generator():
    for elem in some_global_variable.get_stream():
        yield elem

但是如果所有的my_generator对象都在无限流的同一点初始化,并且在循环内“同样”迭代,那么这种方法将为每个“sub_stream”(用作队列)引入“不必要的”内存开销。不必要的:因为这些队列总是相同的(但可以优化:如果存在一些现有的“空”子流,则可以将其重新用于新的子流,并对“pop-logic”进行一些更改)。还可以讨论许多其他实现和细微差别

© www.soinside.com 2019 - 2024. All rights reserved.