itertools 运算符的顺序链接

问题描述 投票:0回答:2

我正在寻找一种顺序组合两个 itertools 运算符的好方法。例如,假设我们想在超过阈值后从生成器序列中选择小于阈值的数字。对于 12000 的阈值,这些对应于

it.takewhile(lambda x: x<12000)
it.takewhile(lambda x: x>=12000)

# Set up an example generator:
def lcg(a=75,c=74,m=2**16+1,x0 = 1):
    xn = x0
    yield xn
    while True:
        xn = (a*xn+c) % m
        yield xn
# First 20 elements:

list(it.islice(lcg(), 20))

[1,      # <- start sequence, start it.takewhile(lambda x: x<12000)
 149,
 11249,  # <- last element of it.takewhile(lambda x: x<12000)
 57305,  # <- start it.takewhile(lambda x: x>=12000) here
 38044,
 35283,
 24819,
 26463,
 18689,
 25472,  # <- last element of it.takewhile(lambda x: x>=12000); end of sequence
 9901,
 21742,
 57836,
 12332,
 7456,
 34978,
 1944,
 14800,
 61482,
 23634]

有没有办法选择大于12000的序列,包括小于12000的初始值,即期望的输出是:

[1, 149, 11249, 57305, 38044, 35283, 24819, 26463, 18689, 25472]

这对于两个 for 循环来说是微不足道的,但我正在寻找一种 itertools 类型的方式(也许是单线?)在不重置

lcg
生成器的情况下组合两个运算符。

python generator python-itertools
2个回答
3
投票

为现有

itertools
库的问题编写单行代码的一种方法是使用带有
{0}
作为默认值的标志变量来指示要使用哪个谓词。首先,标志评估为真值,以便第一个谓词 (
x < 12000
) 有效,如果第一个谓词失败,弹出集合,使标志变为假值,使第二个谓词 (
x >= 12000
) 有效。通过从集合中弹出
0
,当第一个谓词失败时,它还允许表达式回退到同一迭代中的第二个谓词:

takewhile(lambda x, f={0}: f and (x < 12000 or f.pop()) or x >= 12000, lcg())

请注意,在这种情况下使用可变对象作为参数的默认值是安全的,因为

lambda
每次评估时都会创建一个新的函数对象以及可变对象的新实例。

演示:https://replit.com/@blhsing/VirtualGuiltyRatio


1
投票

blhsing,我使用 single

takewhile
stateful 谓词。但我使用生成器,所以 Python 会为我跟踪状态,作为我的代码的进度。这样会更快,至少对于更长的案例来说是这样。

def pred():
    x = yield
    while x < 12000:
        x = yield True
    while x >= 12000:
        x = yield True
    yield False
pred = pred()
next(pred)
result = takewhile(pred.send, lcg())

print(*result)

输出(在线尝试!):

1 149 11249 57305 38044 35283 24819 26463 18689 25472

如果您想要两个以上的阶段,则易于扩展,只需添加更多循环即可。

你的小例子的基准测试结果:

lcg()
  3.28 ± 0.02 μs  Stefan_nonlocal
  3.33 ± 0.01 μs  Stefan_generator
  3.41 ± 0.01 μs  blhsing
  3.81 ± 0.02 μs  blhsing_old

Stefan_generator
是我上面的解决方案。
Stefan_nonlocal
而不是使用
bool
标志,我想看看它与 blhsing 的
set
标志相比如何。
blhsing
是他们当前的解决方案,带有一个
pop()
,而
blhsing_old
是带有多个
clear()
的旧版本。

大部分时间花在你的

lcg
生成器上,所以我也尝试在预先计算的结果列表上运行解决方案,以更好地比较在我们的解决方案中花费的时间:

list(islice(lcg(), 20))
  1.67 ± 0.03 μs  Stefan_nonlocal
  1.75 ± 0.01 μs  Stefan_generator
  1.79 ± 0.03 μs  blhsing
  2.17 ± 0.03 μs  blhsing_old

我尝试了更长的案例,我的发电机稍微高一点的设置成本通过更快的使用而得到回报。两个阶段各有 100 到 10000 个元素:

100 and 100
 14.08 ± 0.18 μs  Stefan_generator
 18.58 ± 0.19 μs  Stefan_nonlocal
 19.94 ± 0.40 μs  blhsing
 25.40 ± 0.31 μs  blhsing_old

1000 and 1000
128.40 ± 1.00 μs  Stefan_generator
178.23 ± 0.87 μs  Stefan_nonlocal
190.18 ± 0.76 μs  blhsing
239.82 ± 3.00 μs  blhsing_old

10000 and 10000
  1.26 ± 0.00 ms  Stefan_generator
  1.76 ± 0.01 ms  Stefan_nonlocal
  1.87 ± 0.00 ms  blhsing
  2.35 ± 0.01 ms  blhsing_old

完整的基准代码(在线尝试!):

from timeit import timeit
from time import time
from statistics import mean, stdev
from collections import deque
from itertools import takewhile, islice

def Stefan_generator(iterable):
    def pred():
        x = yield
        while x < 12000:
            x = yield True
        while x >= 12000:
            x = yield True
        yield False
    pred = pred()
    next(pred)
    return takewhile(pred.send, iterable)

def Stefan_nonlocal(iterable):
    first = True
    def pred(x):
        nonlocal first
        if first:
            if x < 12000:
                return True
            first = False
        return x >= 12000
    return takewhile(pred, iterable)

def blhsing(iterable):
    return takewhile(lambda x, f={0}: f and (x < 12000 or f.pop()) or x >= 12000, iterable)

def blhsing_old(iterable):
    return takewhile(lambda x, f={1}: f and x < 12000 or f.clear() or x >= 12000, iterable)

funcs = Stefan_generator, Stefan_nonlocal, blhsing, blhsing_old

def lcg(a=75, c=74, m=2 ** 16 + 1, x0=1):
    xn = x0
    yield xn
    while True:
        xn = (a * xn + c) % m
        yield xn

if 0:  # make true to show example outputs
  for f in funcs:
    print(*f(lcg()))
    print(*f([13000]))
    print(*f([6000]))
    print(*f([6000, float('nan'), 12000]))
    print()

def test(title, iterable, number, unit, scale):
    print()
    print(title)
    t0 = time()

    times = {f: [] for f in funcs}
    def stats(f):
        ts = [t * scale for t in sorted(times[f])[:10]]
        return f'{mean(ts):6.2f} ± {stdev(ts):4.2f} {unit} '
    for _ in range(100):
        for f in funcs:
            t = timeit(lambda: deque(f(iterable), 0), number=number) / number
            times[f].append(t)
    for f in sorted(funcs, key=stats):
        print(stats(f), f.__name__)
    print(time() - t0)

class Lcg:
    __iter__ = lcg.__call__

test('lcg()', Lcg(), 2500, 'μs', 1e6)
test('list(islice(lcg(), 20))', list(islice(lcg(), 20)), 5000, 'μs', 1e6)
for _ in range(1):
 test('100 and 100', [6000] * 100 + [18000] * 100 + [6000], 500, 'μs', 1e6)
test('1000 and 1000', [6000] * 1000 + [18000] * 1000 + [6000], 50, 'μs', 1e6)
test('10000 and 10000', [6000] * 10000 + [18000] * 10000 + [6000], 5, 'ms', 1e3)
© www.soinside.com 2019 - 2024. All rights reserved.