我有时会使用“简单”的 Python 生成器或生成器理解来构建类似于计算图的东西,例如:
# example 1
w1 = lambda v: v ** 2 # placeholder for expensive operation
w2 = lambda v: v - 3 # placeholder for expensive operation
w3 = lambda v: v / 7 # placeholder for expensive operation
d = [10, 11, 12, 13] # input data, could be "large"
r1 = (w1(x) for x in d) # generator for intermediary result 1
r2 = (w2(x) for x in r1) # generator for intermediary result 2
r3 = [w3(x) for x in r2] # final result
print(r3)
想象一下列表
d
真的很大并且充满了比整数更大的东西。 r1
和 r2
是链式生成器,可以节省大量内存。我的 lambda 是简单的占位符,用于产生新的、独立的中间结果的昂贵计算/处理步骤。
这种方法很酷的一点是一个生成器可以依赖多个其他生成器,例如
zip
函数,在技术上允许“合并/加入图形的分支”:
# example 2
wa1 = lambda v: v ** 2 # placeholder for expensive operation
wb1 = lambda v: v ** 3 # placeholder for expensive operation
wm = lambda a, b: a + b # placeholder for expensive operation (MERGE)
w2 = lambda v: v - 3 # placeholder for expensive operation
w3 = lambda v: v / 7 # placeholder for expensive operation
da = [10, 11, 12, 13] # input data "a", could be "large"
db = [20, 21, 22, 23] # input data "b", could be "large"
ra1 = (wa1(x) for x in da) # generator for intermediary result 1a
rb1 = (wb1(x) for x in db) # generator for intermediary result 1b
rm = (wm(x, y) for x, y in zip(ra1, rb1)) # generator for intermediary result rm -> MERGE of "a" and "b"
r2 = (w2(x) for x in rm) # generator for intermediary result 2
r3 = [w3(x) for x in r2] # final result
print(r3)
两个数据源,
da
和db
。他们的中间结果在 rm
中“合并”,尽管实际计算实际上仅由计算 r3
触发。上面的一切都是生成器,按需计算。
我一直在思考一段时间的事情是如何扭转这一点,即如何使用生成器“拆分成分支”——而不必同时将一个步骤的所有中间结果保存在内存中。考虑以下示例:
# example 3
w1 = lambda v: v ** 2 # placeholder for expensive operation
ws = lambda v: (v - 1, v + 1) # placeholder for expensive operation (SPLIT)
w2 = lambda v: v - 3 # placeholder for expensive operation
w3 = lambda v: v / 7 # placeholder for expensive operation
d = [10, 11, 12, 13] # input data, could be "large"
r1 = (w1(x) for x in d) # generator for intermediary result 1
rs = [ws(x) for x in r1] # ???
ra2 = (w2(x) for x, _ in rs) # generator for intermediary result 2
rb2 = (w2(x) for _, x in rs) # generator for intermediary result 2
ra3 = [w3(x) for x in ra2] # final result "a"
rb3 = [w3(x) for x in rb2] # final result "b"
print(ra3, rb3)
生成器
r1
的结果是两个不同的操作所需要的,如 lambda ws
中所描述的,它也处理“拆分成分支”。
我的问题是:我可以用行为类似于生成器的东西替换
rs
,目前是一个列表理解,只计算每个中间结果一次,但使其可用于多个生成器,例如ra2
和 rb2
,“按需”?如果我必须保留 some 中间结果,即 rs
的元素在任何给定时间缓存在内存中,我会很好 - 只是不是 rs
的 all作为例如一个列表。
由于示例 3 中的分支是对称的,所以我可以解决这个问题:
# example 4
w1 = lambda v: v ** 2 # placeholder for expensive operation
ws = lambda v: (v - 1, v + 1) # placeholder for expensive operation (SPLIT)
w2 = lambda v: v - 3 # placeholder for expensive operation
w3 = lambda v: v / 7 # placeholder for expensive operation
d = [10, 11, 12, 13] # input data, could be "large"
r1 = (w1(x) for x in d) # generator for intermediary result 1
rs = (ws(x) for x in r1) # ???
r2 = ((w2(x), w2(y)) for x, y in rs) # generator for intermediary result 2
r3 = [(w3(x), w3(y)) for x, y in r2] # final result
print(r3)
对于更复杂的处理管道,这可能会变得非常混乱和不切实际。出于这个问题的目的,让我们假设我真的想在分支“a”和“b”的中间结果 2 之间分开。
到目前为止,我最好的坏主意是使用线程和队列,因为所有这些也隐含地提出了执行顺序的问题。在示例 3 中,
ra3
将在 rb3
甚至被触及之前完成评估,这意味着必须保留 rs
的所有中间结果,直到 rb3
可以被评估。实际上,如果我不想同时将所有ra3
保留在内存中,则必须并行或交替评估rb3
和rs
。我想知道是否有更好、更聪明的方法来完成这项工作——它闻起来很像一些 async
魔法在这里有意义。