有效地为每个坏值将n个单元格向右延伸n个单元格

问题描述 投票:8回答:7

假设我有一个长度为30的数组,其中包含4个错误值。我想为那些不好的值创建一个掩码,但由于我将使用滚动窗口函数,我还希望在每个坏值被标记为坏之后有一定数量的后续索引。在下面,n = 3:

enter image description here

我想尽可能高效地执行此操作,因为此例程将在包含数十亿个数据点的大型数据系列上运行多次。因此,我需要尽可能接近numpy矢量化解决方案,因为我想避免python循环。

为了避免重新输入,这里是数组:

import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
python numpy bigdata
7个回答
5
投票

又一个答案! 它只需要你已经拥有的掩码并应用它本身的逻辑或移位版本。很好地矢量化和疯狂快速! :d

def repeat_or(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]

    return k

不幸的是我无法运行m[i:] |= m[:-i] ...修改和使用掩码修改自己可能是一个坏主意。它确实适用于m[:-i] |= m[i:],但这是错误的方向。 无论如何,我们现在有了斐波纳契式的增长,而不是二次增长,这仍然优于线性增长。 (我从未想过我实际上会编写一个与Fibonacci序列真正相关的算法而不会出现一些奇怪的数学问题。)

在“真实”条件下使用1e6和1e5 NANs数组进行测试:

In [5]: a = np.random.random(size=1e6)

In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan

In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop

In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop

In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop

In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop

In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop

我将给托马斯留下更多的基准。


4
投票

这也可以被认为是morphological dilation问题,在这里使用scipy.ndimage.binary_dilation

def dilation(a, n):
    m = np.isnan(a)
    s = np.full(n, True, bool)
    return ndimage.binary_dilation(m, structure=s, origin=-(n//2))

关于origin的注意事项:这个参数确保structure(我称之为内核)从input(你的面具m)的左边开始。通常out[i]的值是structurestructure[n//2]中心(可能是in[i])的扩张,但是你想要structure[0]in[i]

您也可以使用Falses在左侧填充的内核执行此操作,如果您使用binary_dilation中的scikit-image,则需要使用该内核:

def dilation_skimage(a, n):
    m = np.isnan(a)
    s = np.zeros(2*n - n%2, bool)
    s[-n:] = True
    return skimage.morphology.binary_dilation(m, selem=s)

两者之间的时间似乎没有太大变化:

dilation_scipy
small:    10 loops, best of 3: 47.9 ms per loop
large: 10000 loops, best of 3: 88.9 µs per loop

dilation_skimage
small:    10 loops, best of 3: 47.0 ms per loop
large: 10000 loops, best of 3: 91.1 µs per loop

4
投票

OP在这里有基准测试结果。我已经包含了我自己开始使用的自己(“op”),它循环遍历不良索引并向它们添加1 ... n然后使用uniques来查找掩码索引。您可以在下面的代码中看到它与所有其他响应。

无论如何这里是结果。小平面是沿x(10到10e7)的阵列大小和沿y(5,50,500,5000)的窗口大小。然后是每个方面的编码器,得到log-10得分因为我们正在谈论微秒到几分钟。

enter image description here

@swenzel似乎是他的第二个答案的赢家,取代了@moarningsun的第一个答案(moarningsun的第二个答案是通过大量内存使用来崩溃机器,但这可能是因为它不是为大n或非稀疏a设计的)。

由于(必要的)对数比例,该图表对这些贡献中最快的贡献不公平。它们比几乎不错的循环解决方案快几十倍,几百倍。在最大的情况下,swenzel1比op快1000倍,并且op已经在使用numpy。

请注意,我使用了针对优化的英特尔MKL库编译的numpy版本,它充分利用了自2012年以来的AVX指令。在一些矢量用例中,这将使i7 / Xeon速度提高5倍。捐款可能比其他捐款受益更多。

这是到目前为止运行所有提交的答案的完整代码,包括我自己的答案。函数allagree()确保结果是正确的,而timeall()将为您提供一个长形式的pandas Dataframe,其中所有结果都以秒为单位。

您可以使用新代码轻松地重新运行它,或者更改我的假设。请记住,我没有考虑其他因素,如内存使用情况。此外,我使用R ggplot2作为图形,因为我不知道seaborn / matplotlib足以让它做我想要的。

为了完整起见,所有结果都同意:

In [4]: allagree(n = 7, asize = 777)
Out[4]:
             AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0  \
AGML0         True  True       True       True       True         True
AGML1         True  True       True       True       True         True
askewchan0    True  True       True       True       True         True
askewchan1    True  True       True       True       True         True
askewchan2    True  True       True       True       True         True
moarningsun0  True  True       True       True       True         True
swenzel0      True  True       True       True       True         True
swenzel1      True  True       True       True       True         True
op            True  True       True       True       True         True

             swenzel0 swenzel1    op
AGML0            True     True  True
AGML1            True     True  True
askewchan0       True     True  True
askewchan1       True     True  True
askewchan2       True     True  True
moarningsun0     True     True  True
swenzel0         True     True  True
swenzel1         True     True  True
op               True     True  True

感谢所有提交的人!

使用pd中的pd.to_csv和read.csv导出timeall()输出后的图形代码:

ww <- read.csv("ww.csv")    
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")

测试代码:

# test Stack Overflow 32706135 nan shift routines

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)


def AGML0(a, n):                               # loop itertools
    maskleft = np.where(np.isnan(a))[0]
    maskright = maskleft + n
    mask = np.zeros(len(a),dtype=bool)
    for l,r in itertools.izip(maskleft,maskright): 
        mask[l:r] = True
    return mask


def AGML1(a, n):                               # loop n
    nn = n - 1
    maskleft = np.where(np.isnan(a))[0]
    ghost_mask = np.zeros(len(a)+nn,dtype=bool)
    for i in range(0, nn+1):
        thismask = maskleft + i
        ghost_mask[thismask] = True
    mask = ghost_mask[:len(ghost_mask)-nn]
    return mask


def askewchan0(a, n):
    m = np.isnan(a)
    i = np.arange(1, len(m)+1)
    ind = np.column_stack([i-n, i]) # may be a faster way to generate this
    ind.clip(0, len(m)-1, out=ind)
    return np.bitwise_or.reduceat(m, ind.ravel())[::2]


def askewchan1(a, n):
    m = np.isnan(a)
    s = np.full(n, True, bool)
    return ndimage.binary_dilation(m, structure=s, origin=-(n//2))


def askewchan2(a, n):
    m = np.isnan(a)
    s = np.zeros(2*n - n%2, bool)
    s[-n:] = True
    return morphology.binary_dilation(m, selem=s)


def moarningsun0(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask)
    cs[n:] -= cs[:-n].copy()
    return cs > 0


def moarningsun1(a, n):
    mask = np.isnan(a)
    idx = np.flatnonzero(mask)
    expanded_idx = idx[:,None] + np.arange(1, n)
    np.put(mask, expanded_idx, True, 'clip')
    return mask


def swenzel0(a, n):
    m = np.isnan(a)
    k = m.copy()
    for i in range(1, n):
        k[i:] |= m[:-i]
    return k


def swenzel1(a, n=4):
    m = np.isnan(a)
    k = m.copy()

    # lenM and lenK say for each mask how many
    # subsequent Trues there are at least
    lenM, lenK = 1, 1

    # we run until a combination of both masks will give us n or more
    # subsequent Trues
    while lenM+lenK < n:
        # append what we have in k to the end of what we have in m
        m[lenM:] |= k[:-lenM]

        # swap so that m is again the small one
        m, k = k, m

        # update the lengths
        lenM, lenK = lenK, lenM+lenK

    # see how much m has to be shifted in order to append the missing Trues
    k[n-lenM:] |= m[:-n+lenM]
    return k


def op(a, n):
    m = np.isnan(a)
    for x in range(1, n):
        m = np.logical_or(m, np.r_[False, m][:-1])
    return m


# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]

def allagree(fns = funcs, n = 10, asize = 100):
    """ make sure result is the same from all functions """
    fnames = [f.__name__ for f in fns]
    a = np.random.rand(asize)
    a[np.random.randint(0, asize, int(asize / 10))] = np.nan
    results = dict([(f.__name__, f(a, n)) for f in fns])
    isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
    pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
    if not all([all(x) for x in isgood]):
        print "not all results identical"
        pdb.set_trace()
    return pdgood


def timeone(f):
    """ time one of the functions across the full range of a nd n """
    print "Timing", f.__name__
    Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
    As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
    for i in range(len(As)): # 10% of points are always bad
        As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
    results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
                        else np.nan for n in Ns] for a in As])
    pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
    return pdresults


def timeall(fns = funcs):
    """ run timeone for all known funcs """
    testd = dict([(x.__name__, timeone(x)) for x in fns])
    testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
    testdf.index.names = ["coder", "asize"]
    testdf.columns.names = ["nsize"]
    testdf.reset_index(inplace = True)
    testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
    return testdf

3
投票

你可以使用np.ufunc.reduceatnp.bitwise_or

import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
m = np.isnan(a)
n = 4
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)

np.bitwise_or.reduceat(m, ind.ravel())[::2]

关于你的数据:

print np.column_stack([m, reduced])
[[False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [ True  True]
 [False  True]
 [False  True]
 [False  True]
 [False False]
 [ True  True]
 [ True  True]
 [False  True]
 [False  True]
 [False  True]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [False False]
 [ True  True]
 [False  True]]

3
投票

像这样的东西?

maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright): 
   mask[l:r] = True

或者,因为n很小,所以最好在它上面循环:

maskleft = np.where(np.isnan(a))[0]
mask = np.zeros(len(a),dtype=bool)
for i in range(0,n):
  thismask = maskleft+i
  mask[thismask] = True

除了n上的循环,以上是完全矢量化的。但是循环是完全可并行化的,因此您可以使用例如加速因子来获得因子n加速。多处理或Cython,如果你愿意去解决问题。

编辑:根据@askewchan解决方案2可能会导致超出范围的错误。它还在range(0,n)中有索引问题。可能的纠正:

maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+n,dtype=bool)
for i in range(0, n+1):
    thismask = maskleft + i
    ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-n]

3
投票

您可以使用与运行平均过滤器相同的cumsum技巧:

def cumsum_trick(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask)
    cs[n:] -= cs[:-n].copy()
    return cs > 0

不幸的是,需要额外的.copy(),因为 一些在内部进行的缓冲 操作的顺序。有可能说服numpy反向应用减法,但为了工作,cs数组必须有一个负步幅:

def cumsum_trick_nocopy(a, n):
    mask = np.isnan(a)
    cs = np.cumsum(mask, out=np.empty_like(a, int)[::-1])
    cs[n:] -= cs[:-n]
    out = cs > 0
    return out

但这似乎很脆弱,反正其速度并不快。

我想知道是否有某个编译信号处理功能可以完成这个操作。


对于稀疏的初始掩模和小n这个也很快:

def index_expansion(a, n):
    mask = np.isnan(a)
    idx = np.flatnonzero(mask)
    expanded_idx = idx[:,None] + np.arange(1, n)
    np.put(mask, expanded_idx, True, 'clip')
    return mask

2
投票

几年后,我已经提出了一个完全矢量化的解决方案,不需要循环或副本(除了掩码本身)。这个解决方案有点(潜在)危险,因为它使用numpy.lib.stride_tricks.as_strided。它也没有@swentzel's solution那么快。

我们的想法是采用蒙版并创建它的2D视图,其中第二个维度就是当前元素之后的元素。然后你可以设置整个列到True,如果头是True。由于您正在处理视图,因此设置列实际上会在掩码中设置以下元素。

从数据开始:

import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
              9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
n = 3

现在,我们将使掩码a.size + n元素变长,这样您就不必手动处理最后的n元素:

mask = np.empty(a.size + n, dtype=np.bool)
np.isnan(a, out=mask[:a.size])
mask[a.size:] = False

现在很酷的部分:

view = np.lib.stride_tricks.as_strided(mask, shape=(n + 1, a.size),
                                       strides=mask.strides * 2)

最后一部分至关重要。 mask.strides是一个像(1,)这样的元组(因为bool通常大约有很多字节。加倍意味着你需要一个1字节的步骤来移动任何维度中的一个元素。

现在您需要做的就是扩展掩码:

view[1:, view[0]] = True

而已。现在mask有你想要的。请记住,这仅适用,因为赋值索引在最后更改的值之前。你无法逃脱view[1:] |= view[0]

出于长凳目的,似乎n的定义已从问题中改变,因此以下函数将此考虑在内:

def madphysicist0(a, n):
    m = np.empty(a.size + n - 1, dtype=np.bool)
    np.isnan(a, out=m[:a.size])
    m[a.size:] = False

    v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
    v[1:, v[0]] = True
    return v[0]

V2

从现有的最快答案中取出一个叶子,我们只需要复制log2(n)行,而不是n行:

def madphysicist1(a, n):
    m = np.empty(a.size + n - 1, dtype=np.bool)
    np.isnan(a, out=m[:a.size])
    m[a.size:] = False

    v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)

    stop = int(np.log2(n))
    for k in range(1, stop + 1):
        v[k, v[0]] = True
    if (1<<k) < n:
        v[-1, v[(1<<k) - 1]] = True
    return v[0]

由于这在每次迭代时都会使掩模的大小加倍,因此它的工作速度比斐波那契快一点:https://math.stackexchange.com/q/894743/295281

© www.soinside.com 2019 - 2024. All rights reserved.