假设我有一个长度为30的数组,其中包含4个错误值。我想为那些不好的值创建一个掩码,但由于我将使用滚动窗口函数,我还希望在每个坏值被标记为坏之后有一定数量的后续索引。在下面,n = 3:
我想尽可能高效地执行此操作,因为此例程将在包含数十亿个数据点的大型数据系列上运行多次。因此,我需要尽可能接近numpy矢量化解决方案,因为我想避免python循环。
为了避免重新输入,这里是数组:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
又一个答案! 它只需要你已经拥有的掩码并应用它本身的逻辑或移位版本。很好地矢量化和疯狂快速! :d
def repeat_or(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
不幸的是我无法运行m[i:] |= m[:-i]
...修改和使用掩码修改自己可能是一个坏主意。它确实适用于m[:-i] |= m[i:]
,但这是错误的方向。
无论如何,我们现在有了斐波纳契式的增长,而不是二次增长,这仍然优于线性增长。
(我从未想过我实际上会编写一个与Fibonacci序列真正相关的算法而不会出现一些奇怪的数学问题。)
在“真实”条件下使用1e6和1e5 NANs数组进行测试:
In [5]: a = np.random.random(size=1e6)
In [6]: a[np.random.choice(np.arange(len(a), dtype=int), 1e5, replace=False)] = np.nan
In [7]: %timeit reduceat(a)
10 loops, best of 3: 65.2 ms per loop
In [8]: %timeit index_expansion(a)
100 loops, best of 3: 12 ms per loop
In [9]: %timeit cumsum_trick(a)
10 loops, best of 3: 17 ms per loop
In [10]: %timeit repeat_or(a)
1000 loops, best of 3: 1.9 ms per loop
In [11]: %timeit agml_indexing(a)
100 loops, best of 3: 6.91 ms per loop
我将给托马斯留下更多的基准。
这也可以被认为是morphological dilation问题,在这里使用scipy.ndimage.binary_dilation
:
def dilation(a, n):
m = np.isnan(a)
s = np.full(n, True, bool)
return ndimage.binary_dilation(m, structure=s, origin=-(n//2))
关于origin
的注意事项:这个参数确保structure
(我称之为内核)从input
(你的面具m
)的左边开始。通常out[i]
的值是structure
的structure[n//2]
中心(可能是in[i]
)的扩张,但是你想要structure[0]
在in[i]
。
您也可以使用False
s在左侧填充的内核执行此操作,如果您使用binary_dilation
中的scikit-image,则需要使用该内核:
def dilation_skimage(a, n):
m = np.isnan(a)
s = np.zeros(2*n - n%2, bool)
s[-n:] = True
return skimage.morphology.binary_dilation(m, selem=s)
两者之间的时间似乎没有太大变化:
dilation_scipy
small: 10 loops, best of 3: 47.9 ms per loop
large: 10000 loops, best of 3: 88.9 µs per loop
dilation_skimage
small: 10 loops, best of 3: 47.0 ms per loop
large: 10000 loops, best of 3: 91.1 µs per loop
OP在这里有基准测试结果。我已经包含了我自己开始使用的自己(“op”),它循环遍历不良索引并向它们添加1 ... n然后使用uniques来查找掩码索引。您可以在下面的代码中看到它与所有其他响应。
无论如何这里是结果。小平面是沿x(10到10e7)的阵列大小和沿y(5,50,500,5000)的窗口大小。然后是每个方面的编码器,得到log-10得分因为我们正在谈论微秒到几分钟。
@swenzel似乎是他的第二个答案的赢家,取代了@moarningsun的第一个答案(moarningsun的第二个答案是通过大量内存使用来崩溃机器,但这可能是因为它不是为大n或非稀疏a设计的)。
由于(必要的)对数比例,该图表对这些贡献中最快的贡献不公平。它们比几乎不错的循环解决方案快几十倍,几百倍。在最大的情况下,swenzel1比op快1000倍,并且op已经在使用numpy。
请注意,我使用了针对优化的英特尔MKL库编译的numpy版本,它充分利用了自2012年以来的AVX指令。在一些矢量用例中,这将使i7 / Xeon速度提高5倍。捐款可能比其他捐款受益更多。
这是到目前为止运行所有提交的答案的完整代码,包括我自己的答案。函数allagree()确保结果是正确的,而timeall()将为您提供一个长形式的pandas Dataframe,其中所有结果都以秒为单位。
您可以使用新代码轻松地重新运行它,或者更改我的假设。请记住,我没有考虑其他因素,如内存使用情况。此外,我使用R ggplot2作为图形,因为我不知道seaborn / matplotlib足以让它做我想要的。
为了完整起见,所有结果都同意:
In [4]: allagree(n = 7, asize = 777)
Out[4]:
AGML0 AGML1 askewchan0 askewchan1 askewchan2 moarningsun0 \
AGML0 True True True True True True
AGML1 True True True True True True
askewchan0 True True True True True True
askewchan1 True True True True True True
askewchan2 True True True True True True
moarningsun0 True True True True True True
swenzel0 True True True True True True
swenzel1 True True True True True True
op True True True True True True
swenzel0 swenzel1 op
AGML0 True True True
AGML1 True True True
askewchan0 True True True
askewchan1 True True True
askewchan2 True True True
moarningsun0 True True True
swenzel0 True True True
swenzel1 True True True
op True True True
感谢所有提交的人!
使用pd中的pd.to_csv和read.csv导出timeall()输出后的图形代码:
ww <- read.csv("ww.csv")
ggplot(ww, aes(x=coder, y=value, col = coder)) + geom_point(size = 3) + scale_y_continuous(trans="log10")+ facet_grid(nsize ~ asize) + theme(axis.text.x = element_text(angle = 90, hjust = 1)) + ggtitle("Fastest by coder") + ylab("time (seconds)")
测试代码:
# test Stack Overflow 32706135 nan shift routines
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from timeit import Timer
from scipy import ndimage
from skimage import morphology
import itertools
import pdb
np.random.seed(8472)
def AGML0(a, n): # loop itertools
maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright):
mask[l:r] = True
return mask
def AGML1(a, n): # loop n
nn = n - 1
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+nn,dtype=bool)
for i in range(0, nn+1):
thismask = maskleft + i
ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-nn]
return mask
def askewchan0(a, n):
m = np.isnan(a)
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)
return np.bitwise_or.reduceat(m, ind.ravel())[::2]
def askewchan1(a, n):
m = np.isnan(a)
s = np.full(n, True, bool)
return ndimage.binary_dilation(m, structure=s, origin=-(n//2))
def askewchan2(a, n):
m = np.isnan(a)
s = np.zeros(2*n - n%2, bool)
s[-n:] = True
return morphology.binary_dilation(m, selem=s)
def moarningsun0(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask)
cs[n:] -= cs[:-n].copy()
return cs > 0
def moarningsun1(a, n):
mask = np.isnan(a)
idx = np.flatnonzero(mask)
expanded_idx = idx[:,None] + np.arange(1, n)
np.put(mask, expanded_idx, True, 'clip')
return mask
def swenzel0(a, n):
m = np.isnan(a)
k = m.copy()
for i in range(1, n):
k[i:] |= m[:-i]
return k
def swenzel1(a, n=4):
m = np.isnan(a)
k = m.copy()
# lenM and lenK say for each mask how many
# subsequent Trues there are at least
lenM, lenK = 1, 1
# we run until a combination of both masks will give us n or more
# subsequent Trues
while lenM+lenK < n:
# append what we have in k to the end of what we have in m
m[lenM:] |= k[:-lenM]
# swap so that m is again the small one
m, k = k, m
# update the lengths
lenM, lenK = lenK, lenM+lenK
# see how much m has to be shifted in order to append the missing Trues
k[n-lenM:] |= m[:-n+lenM]
return k
def op(a, n):
m = np.isnan(a)
for x in range(1, n):
m = np.logical_or(m, np.r_[False, m][:-1])
return m
# all the functions in a list. NB these are the actual functions, not their names
funcs = [AGML0, AGML1, askewchan0, askewchan1, askewchan2, moarningsun0, swenzel0, swenzel1, op]
def allagree(fns = funcs, n = 10, asize = 100):
""" make sure result is the same from all functions """
fnames = [f.__name__ for f in fns]
a = np.random.rand(asize)
a[np.random.randint(0, asize, int(asize / 10))] = np.nan
results = dict([(f.__name__, f(a, n)) for f in fns])
isgood = [[np.array_equal(results[f1], results[f2]) for f1 in fnames] for f2 in fnames]
pdgood = pd.DataFrame(isgood, columns = fnames, index = fnames)
if not all([all(x) for x in isgood]):
print "not all results identical"
pdb.set_trace()
return pdgood
def timeone(f):
""" time one of the functions across the full range of a nd n """
print "Timing", f.__name__
Ns = np.array([10**x for x in range(0, 4)]) * 5 # 5 to 5000 window size
As = [np.random.rand(10 ** x) for x in range(1, 8)] # up to 10 million data data points
for i in range(len(As)): # 10% of points are always bad
As[i][np.random.randint(0, len(As[i]), len(As[i]) / 10)] = np.nan
results = np.array([[Timer(lambda: f(a, n)).timeit(number = 1) if n < len(a) \
else np.nan for n in Ns] for a in As])
pdresults = pd.DataFrame(results, index = [len(x) for x in As], columns = Ns)
return pdresults
def timeall(fns = funcs):
""" run timeone for all known funcs """
testd = dict([(x.__name__, timeone(x)) for x in fns])
testdf = pd.concat(testd.values(), axis = 0, keys = testd.keys())
testdf.index.names = ["coder", "asize"]
testdf.columns.names = ["nsize"]
testdf.reset_index(inplace = True)
testdf = pd.melt(testdf, id_vars = ["coder", "asize"])
return testdf
你可以使用np.ufunc.reduceat
和np.bitwise_or
:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
m = np.isnan(a)
n = 4
i = np.arange(1, len(m)+1)
ind = np.column_stack([i-n, i]) # may be a faster way to generate this
ind.clip(0, len(m)-1, out=ind)
np.bitwise_or.reduceat(m, ind.ravel())[::2]
关于你的数据:
print np.column_stack([m, reduced])
[[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[ True True]
[False True]
[False True]
[False True]
[False False]
[ True True]
[ True True]
[False True]
[False True]
[False True]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[False False]
[ True True]
[False True]]
像这样的东西?
maskleft = np.where(np.isnan(a))[0]
maskright = maskleft + n
mask = np.zeros(len(a),dtype=bool)
for l,r in itertools.izip(maskleft,maskright):
mask[l:r] = True
或者,因为n很小,所以最好在它上面循环:
maskleft = np.where(np.isnan(a))[0]
mask = np.zeros(len(a),dtype=bool)
for i in range(0,n):
thismask = maskleft+i
mask[thismask] = True
除了n上的循环,以上是完全矢量化的。但是循环是完全可并行化的,因此您可以使用例如加速因子来获得因子n加速。多处理或Cython,如果你愿意去解决问题。
编辑:根据@askewchan解决方案2可能会导致超出范围的错误。它还在range(0,n)
中有索引问题。可能的纠正:
maskleft = np.where(np.isnan(a))[0]
ghost_mask = np.zeros(len(a)+n,dtype=bool)
for i in range(0, n+1):
thismask = maskleft + i
ghost_mask[thismask] = True
mask = ghost_mask[:len(ghost_mask)-n]
您可以使用与运行平均过滤器相同的cumsum技巧:
def cumsum_trick(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask)
cs[n:] -= cs[:-n].copy()
return cs > 0
不幸的是,需要额外的.copy()
,因为
一些在内部进行的缓冲
操作的顺序。有可能说服numpy反向应用减法,但为了工作,cs
数组必须有一个负步幅:
def cumsum_trick_nocopy(a, n):
mask = np.isnan(a)
cs = np.cumsum(mask, out=np.empty_like(a, int)[::-1])
cs[n:] -= cs[:-n]
out = cs > 0
return out
但这似乎很脆弱,反正其速度并不快。
我想知道是否有某个编译信号处理功能可以完成这个操作。
对于稀疏的初始掩模和小n
这个也很快:
def index_expansion(a, n):
mask = np.isnan(a)
idx = np.flatnonzero(mask)
expanded_idx = idx[:,None] + np.arange(1, n)
np.put(mask, expanded_idx, True, 'clip')
return mask
几年后,我已经提出了一个完全矢量化的解决方案,不需要循环或副本(除了掩码本身)。这个解决方案有点(潜在)危险,因为它使用numpy.lib.stride_tricks.as_strided
。它也没有@swentzel's solution那么快。
我们的想法是采用蒙版并创建它的2D视图,其中第二个维度就是当前元素之后的元素。然后你可以设置整个列到True
,如果头是True
。由于您正在处理视图,因此设置列实际上会在掩码中设置以下元素。
从数据开始:
import numpy as np
a = np.array([4, 0, 8, 5, 10, 9, np.nan, 1, 4, 9, 9, np.nan, np.nan, 9,\
9, 8, 0, 3, 7, 9, 2, 6, 7, 2, 9, 4, 1, 1, np.nan, 10])
n = 3
现在,我们将使掩码a.size + n
元素变长,这样您就不必手动处理最后的n
元素:
mask = np.empty(a.size + n, dtype=np.bool)
np.isnan(a, out=mask[:a.size])
mask[a.size:] = False
现在很酷的部分:
view = np.lib.stride_tricks.as_strided(mask, shape=(n + 1, a.size),
strides=mask.strides * 2)
最后一部分至关重要。 mask.strides
是一个像(1,)
这样的元组(因为bool通常大约有很多字节。加倍意味着你需要一个1字节的步骤来移动任何维度中的一个元素。
现在您需要做的就是扩展掩码:
view[1:, view[0]] = True
而已。现在mask
有你想要的。请记住,这仅适用,因为赋值索引在最后更改的值之前。你无法逃脱view[1:] |= view[0]
。
出于长凳目的,似乎n
的定义已从问题中改变,因此以下函数将此考虑在内:
def madphysicist0(a, n):
m = np.empty(a.size + n - 1, dtype=np.bool)
np.isnan(a, out=m[:a.size])
m[a.size:] = False
v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
v[1:, v[0]] = True
return v[0]
V2
从现有的最快答案中取出一个叶子,我们只需要复制log2(n)
行,而不是n
行:
def madphysicist1(a, n):
m = np.empty(a.size + n - 1, dtype=np.bool)
np.isnan(a, out=m[:a.size])
m[a.size:] = False
v = np.lib.stride_tricks.as_strided(m, shape=(n, a.size), strides=m.strides * 2)
stop = int(np.log2(n))
for k in range(1, stop + 1):
v[k, v[0]] = True
if (1<<k) < n:
v[-1, v[(1<<k) - 1]] = True
return v[0]
由于这在每次迭代时都会使掩模的大小加倍,因此它的工作速度比斐波那契快一点:https://math.stackexchange.com/q/894743/295281