(节省内存)将`sorted`作为生成器来实现

问题描述 投票:1回答:1

我想这不是一个特别新的话题,我想有比我的更好的实现。我正在寻找(a)我正在处理的算法类型--它的实际名称或类似的名称--以及(b)潜在的更好的实现。

一般问题:想象一个列表 a,它非常长--太长了,无法多次放入内存。列表中包含了一个 "随机 "的事物序列,这些事物允许被排序(<, >== 工作)。) 我想按升序迭代列表中的所有条目,包括重复的条目,但不复制列表,也不产生任何类似 "极端 "长度的条目。我还想保持列表中条目的原始顺序。a即原地 sort 被排除在外。所以我基本上希望在不修改原始数据源的情况下,尽量减少排序所需的内存。

Python的sorted并不接触原始数据,而是生成一个新的列表,与原始数据具有相同的大小长度。因此,我的基本想法是重新实施 sorted 作为发电机。

def sorted_nocopy_generator(data_list):
    state_max = max(data_list)
    state = min(data_list)
    state_count = data_list.count(state)
    for _ in range(state_count):
        yield state
    index = state_count
    while index < len(data_list):
        new_state = state_max
        for entry in data_list:
            if state < entry < new_state:
                new_state = entry
        state = new_state
        state_count = data_list.count(state)
        for _ in range(state_count):
            yield state
        index += state_count

它可以被测试如下。

import random

a_min = 0
a_max = 10000
a = list(range(a_min, a_max)) # test data
a.extend((random.randint(a_min, a_max - 1) for _ in range(len(a) // 10))) # some double entries
random.shuffle(a) # "random" order
a_control = a.copy() # for verification that a is not altered

a_test_sorted_nocopy_generator = list(sorted_nocopy_generator(a))
assert a == a_control
a_test_sorted = sorted(a)
assert a == a_control
assert a_test_sorted == a_test_sorted_nocopy_generator

它的尺度是O(N^2), 就像bubblesort那样. 我在寻找什么样的算法?如何优化这个东西(可能是通过买卖 一些 内存)?)

python algorithm sorting generator
1个回答
4
投票

在这里画个草图。 哪儿 N = len(data_list)S = sqrt(N)使用 O(S) 额外的内存,并采取最坏的情况 O(N*log(N)) 时间。

  • 每一个连续的片长 S 中的原始数据,将该片复制到一个临时列表中,使用 .sort() 来对其进行排序,然后将结果写入一个唯一的临时文件。 将会有大约 S 所有的临时文件。

  • 将这些临时文件送入 heapq.merge(). 那是一个发电机,只追踪到了 S 当前最小值 S 输入,所以这部分也有 O(S) 内存负担。

  • 删除临时文件。

内存越多,所需的临时文件越少,速度也就越快。

削减常数因子

正如评论中指出的,次二次元时间算法的希望不大。 然而,你可以通过削减数据的传递次数来削减原算法中的常数因子。 这是一个方法,产生下一个 K 每次通过数据时的条目。 不过总体上仍然是二次元时间。

def sorted_nocopy_generator(data_list, K=100):
    import itertools
    from bisect import insort

    assert K >= 1
    total = 0
    too_small = None

    while total < len(data_list):
        active = [] # hold the next K entries
        entry2count = {}
        for entry in data_list:
            if entry in entry2count:
                entry2count[entry] += 1
            elif ((too_small is None or too_small < entry) and
                  (len(active) < K or entry < active[-1])):
                insort(active, entry)
                entry2count[entry] = 1
                if len(active) > K: # forget the largest
                    del entry2count[active.pop()]
        for entry in active:
            count = entry2count[entry]
            yield from itertools.repeat(entry, count)
            total += count
        too_small = active[-1]

而消除最坏的情况

就像 @btilly 的回答一样,上面代码中最糟糕的情况可以通过使用 max heap 来规避。 然后在上面的代码中添加一个新的条目到 active 有最坏情况下的时间 O(log(K)) 而不是 O(K).

幸运的是... heapq 模块已经为这个目的提供了一些可用的东西。 heapq 的用途并没有暴露出来。

所以下面的特殊情况下,最大的最小的 K 感兴趣的条目,使用 .count() (就像你原来的程序一样)做一个完整的通道来计算有多少个。

但不需要对 每一 独一无二的元素,它只需要在每一个 K 元素。

额外的内存使用量与 K.

def sorted_nocopy_generator(data_list, K=100):
    import itertools
    from heapq import nsmallest

    assert K >= 1
    too_small = None
    ntodo = len(data_list)

    while ntodo:
        if too_small is None:
            active = nsmallest(K, data_list)
        else:
            active = nsmallest(K, (x for x in data_list
                                     if x > too_small))
        too_small = active[-1]
        for x in active:
            if x == too_small:
                break
            yield x
            ntodo -= 1
        count = data_list.count(too_small)
        yield from itertools.repeat(too_small, count)
        ntodo -= count
        assert ntodo >= 0

3
投票

假设你有以下的记忆 k 的东西在一个单独的列表中。 然后下面的比例会像 O((n + n^2/k) log(k)). 在极端情况下 k 常数,它是二次型的,如果 k 是一个固定的分数 n 那就是 O(n log(n)).

这个想法是建立一个缓冲区的。k 最小的东西,你还没有返回。 把它变成一个迷你堆,然后用堆操作及时从它那里返回东西。O(log(k)) 每返回一个元素。 当缓冲区为空时,重新填满缓冲区,然后再进行之前的操作。 缓冲区开始时是空的)。

为了创建缓冲区,我们扫描数组,放入比最后返回的元素大的元素,直到达到 k 缓冲区中的东西。 然后把它变成一个最大堆,当堆中的东西大于返回的东西,小于已有的东西时,就把它替换掉。 当它被重新填满时,将它重新组织成一个min-heap。

你将需要重新填充缓冲区 n/k 次。 每一次都是 O(n log(k)) 操作最坏情况。 (如果 k << n 而数组是随机顺序的,那么平均运行时间是 O(n) 因为几乎所有的东西都会与堆中的最大值进行比较,然后丢弃)。)

重要的边缘情况。 如果你有重复的东西,你需要考虑到你的缓冲区只包含了一些最大的东西的副本的情况。 一种方法是跟踪最大的东西有多少副本应该在堆中,但实际上并没有存储在那里。 这样在你清空堆后,你就可以把所有缺失的副本也返回来,然后继续扫描更大的元素。

© www.soinside.com 2019 - 2024. All rights reserved.