我正在寻找一种顺序组合两个 itertools 运算符的好方法。例如,假设我们想在超过阈值后从生成器序列中选择小于阈值的数字。对于 12000 的阈值,这些对应于
it.takewhile(lambda x: x<12000)
和 it.takewhile(lambda x: x>=12000)
:
# Set up an example generator:
def lcg(a=75,c=74,m=2**16+1,x0 = 1):
xn = x0
yield xn
while True:
xn = (a*xn+c) % m
yield xn
# First 20 elements:
list(it.islice(lcg(), 20))
[1, # <- start sequence, start it.takewhile(lambda x: x<12000)
149,
11249, # <- last element of it.takewhile(lambda x: x<12000)
57305, # <- start it.takewhile(lambda x: x>=12000) here
38044,
35283,
24819,
26463,
18689,
25472, # <- last element of it.takewhile(lambda x: x>=12000); end of sequence
9901,
21742,
57836,
12332,
7456,
34978,
1944,
14800,
61482,
23634]
有没有办法选择大于12000的序列,包括小于12000的初始值,即期望的输出是:
[1, 149, 11249, 57305, 38044, 35283, 24819, 26463, 18689, 25472]
这对于两个 for 循环来说是微不足道的,但我正在寻找一种 itertools 类型的方式(也许是单线?)在不重置
lcg
生成器的情况下组合两个运算符。
为现有
itertools
库的问题编写单行代码的一种方法是使用带有 {0}
作为默认值的标志变量来指示要使用哪个谓词。首先,标志评估为真值,以便第一个谓词 (x < 12000
) 有效,如果第一个谓词失败,弹出集合,使标志变为假值,使第二个谓词 (x >= 12000
) 有效。通过从集合中弹出0
,当第一个谓词失败时,它还允许表达式回退到同一迭代中的第二个谓词:
takewhile(lambda x, f={0}: f and (x < 12000 or f.pop()) or x >= 12000, lcg())
请注意,在这种情况下使用可变对象作为参数的默认值是安全的,因为
lambda
每次评估时都会创建一个新的函数对象以及可变对象的新实例。
像 blhsing,我使用 single
takewhile
和 stateful 谓词。但我使用生成器,所以 Python 会为我跟踪状态,作为我的代码的进度。这样会更快,至少对于更长的案例来说是这样。
def pred():
x = yield
while x < 12000:
x = yield True
while x >= 12000:
x = yield True
yield False
pred = pred()
next(pred)
result = takewhile(pred.send, lcg())
print(*result)
输出(在线尝试!):
1 149 11249 57305 38044 35283 24819 26463 18689 25472
如果您想要两个以上的阶段,则易于扩展,只需添加更多循环即可。
你的小例子的基准测试结果:
lcg()
3.28 ± 0.02 μs Stefan_nonlocal
3.33 ± 0.01 μs Stefan_generator
3.41 ± 0.01 μs blhsing
3.81 ± 0.02 μs blhsing_old
Stefan_generator
是我上面的解决方案。 Stefan_nonlocal
而不是使用 bool
标志,我想看看它与 blhsing 的 set
标志相比如何。 blhsing
是他们当前的解决方案,带有一个 pop()
,而 blhsing_old
是带有多个 clear()
的旧版本。
大部分时间花在你的
lcg
生成器上,所以我也尝试在预先计算的结果列表上运行解决方案,以更好地比较在我们的解决方案中花费的时间:
list(islice(lcg(), 20))
1.67 ± 0.03 μs Stefan_nonlocal
1.75 ± 0.01 μs Stefan_generator
1.79 ± 0.03 μs blhsing
2.17 ± 0.03 μs blhsing_old
我尝试了更长的案例,我的发电机稍微高一点的设置成本通过更快的使用而得到回报。两个阶段各有 100 到 10000 个元素:
100 and 100
14.08 ± 0.18 μs Stefan_generator
18.58 ± 0.19 μs Stefan_nonlocal
19.94 ± 0.40 μs blhsing
25.40 ± 0.31 μs blhsing_old
1000 and 1000
128.40 ± 1.00 μs Stefan_generator
178.23 ± 0.87 μs Stefan_nonlocal
190.18 ± 0.76 μs blhsing
239.82 ± 3.00 μs blhsing_old
10000 and 10000
1.26 ± 0.00 ms Stefan_generator
1.76 ± 0.01 ms Stefan_nonlocal
1.87 ± 0.00 ms blhsing
2.35 ± 0.01 ms blhsing_old
完整的基准代码(在线尝试!):
from timeit import timeit
from time import time
from statistics import mean, stdev
from collections import deque
from itertools import takewhile, islice
def Stefan_generator(iterable):
def pred():
x = yield
while x < 12000:
x = yield True
while x >= 12000:
x = yield True
yield False
pred = pred()
next(pred)
return takewhile(pred.send, iterable)
def Stefan_nonlocal(iterable):
first = True
def pred(x):
nonlocal first
if first:
if x < 12000:
return True
first = False
return x >= 12000
return takewhile(pred, iterable)
def blhsing(iterable):
return takewhile(lambda x, f={0}: f and (x < 12000 or f.pop()) or x >= 12000, iterable)
def blhsing_old(iterable):
return takewhile(lambda x, f={1}: f and x < 12000 or f.clear() or x >= 12000, iterable)
funcs = Stefan_generator, Stefan_nonlocal, blhsing, blhsing_old
def lcg(a=75, c=74, m=2 ** 16 + 1, x0=1):
xn = x0
yield xn
while True:
xn = (a * xn + c) % m
yield xn
if 0: # make true to show example outputs
for f in funcs:
print(*f(lcg()))
print(*f([13000]))
print(*f([6000]))
print(*f([6000, float('nan'), 12000]))
print()
def test(title, iterable, number, unit, scale):
print()
print(title)
t0 = time()
times = {f: [] for f in funcs}
def stats(f):
ts = [t * scale for t in sorted(times[f])[:10]]
return f'{mean(ts):6.2f} ± {stdev(ts):4.2f} {unit} '
for _ in range(100):
for f in funcs:
t = timeit(lambda: deque(f(iterable), 0), number=number) / number
times[f].append(t)
for f in sorted(funcs, key=stats):
print(stats(f), f.__name__)
print(time() - t0)
class Lcg:
__iter__ = lcg.__call__
test('lcg()', Lcg(), 2500, 'μs', 1e6)
test('list(islice(lcg(), 20))', list(islice(lcg(), 20)), 5000, 'μs', 1e6)
for _ in range(1):
test('100 and 100', [6000] * 100 + [18000] * 100 + [6000], 500, 'μs', 1e6)
test('1000 and 1000', [6000] * 1000 + [18000] * 1000 + [6000], 50, 'μs', 1e6)
test('10000 and 10000', [6000] * 10000 + [18000] * 10000 + [6000], 5, 'ms', 1e3)