我能够使用循环计算一维数组的滚动相关系数(针对
[0, 1, 2, 3, 4]
的数据)。
我正在寻找使用 numpy
(不是 pandas
) 的更智能解决方案。
这是我当前的代码:
import numpy as np
data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
x = np.zeros_like(data).astype('float32')
length = 5
for i in range(length, data.shape[0]):
x[i] = np.corrcoef(data[i - length:i], np.arange(length))[0, 1]
print(x)
x 给出:
[ 0. 0. 0. 0. 0. 0.607 0.959 0.98 0.328 -0.287
-0.61 -0.314 -0.18 -0.8 -0.782 -0.847 -0.811 -0.825 -0.869 -0.283
0.566 0.863 0.643 0.454]
有没有没有循环的解决方案?
numpy.lib.stride_tricks.sliding_window_view
(在 numpy v1.20.0+ 中可用)
swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))
给出了
data
数组的视图,如下所示:
array([[10, 5, 8, 9, 15],
[ 5, 8, 9, 15, 22],
[ 8, 9, 15, 22, 26],
[ 9, 15, 22, 26, 11],
[15, 22, 26, 11, 15],
[22, 26, 11, 15, 16],
[26, 11, 15, 16, 18],
[11, 15, 16, 18, 7],
[15, 16, 18, 7, 4],
[16, 18, 7, 4, 8],
[18, 7, 4, 8, -2],
[ 7, 4, 8, -2, -3],
[ 4, 8, -2, -3, -4],
[ 8, -2, -3, -4, -6],
[-2, -3, -4, -6, -2],
[-3, -4, -6, -2, 0],
[-4, -6, -2, 0, 10],
[-6, -2, 0, 10, 0],
[-2, 0, 10, 0, 5],
[ 0, 10, 0, 5, 8]])
np.corrcoef
不接受 axis
参数,它将计算应用于 整个 矩阵,并且没有提供对每行/列执行此操作的方法。
但是,两个向量的相关系数的计算非常简单:
在这里应用:
def vec_corrcoef(X, y, axis=1):
Xm = np.mean(X, axis=axis, keepdims=True)
ym = np.mean(y)
n = np.sum((X - Xm) * (y - ym), axis=axis)
d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
return n / d
现在,用我们的数组和
arange
: 调用这个函数
cc = vec_corrcoef(swindow, np.arange(length))
给出了期望的结果:
array([ 0.60697698, 0.95894955, 0.98 , 0.3279521 , -0.28709766,
-0.61035663, -0.31390158, -0.17995394, -0.80041656, -0.78192905,
-0.84702587, -0.81091772, -0.82464375, -0.86892667, -0.28347335,
0.56568542, 0.86304424, 0.64326752, 0.45374261, 0.38135638])
要获取
x
,只需设置正确大小的 zeros
数组的适当索引即可。
注意:我认为你的
x
应该包含从 4
索引开始的非零值(因为滑动窗口已满),而不是从索引 5
开始。
x = np.zeros(data.shape)
x[-len(cc):] = cc
如果您确定您的值应从索引
5
开始,那么您可以执行以下操作:
x = np.zeros(data.shape)
x[length:] = cc[:-1] # Ignore the last value in cc
将原始方法的运行时间与此处答案中建议的方法进行比较:
f_OP_loopy
是你的方法,它使用循环实现滑动窗口f_PH_numpy
是我的方法,它使用sliding_window_view
和向量化函数来逐行计算向量相关系数f_RA_numpy
是 Rontogiannis 的方法,它平铺 arange
,计算整个矩阵的相关系数,并且仅选择最后一列的前 len(data) - length
行f_RA_recur
是 Rontogiannis 的递归方法,但我没有计时,因为它错过了最后一个相关系数。np.corrcoef
计算中还是在 中np.tile
操作。注意:该图是在我的 2.2GHz i7 Macbook Air、8GB RAM、Python 3.10.7 和 numpy 1.23.3 上获得的。在 Google Colab 上也得到了类似的结果
如果您对计时代码感兴趣,这里是:
import timeit
import numpy as np
from matplotlib import pyplot as plt
def time_funcs(funcs, sizes, arg_gen, N=20):
times = np.zeros((len(sizes), len(funcs)))
gdict = globals().copy()
for i, s in enumerate(sizes):
args = arg_gen(s)
print(args)
for j, f in enumerate(funcs):
gdict.update(locals())
try:
times[i, j] = timeit.timeit("f(*args)", globals=gdict, number=N) / N
print(f"{i}/{len(sizes)}, {j}/{len(funcs)}, {times[i, j]}")
except ValueError:
print(f"ERROR in {f}, with args=", *args)
return times
def plot_times(times, funcs):
fig, ax = plt.subplots()
for j, f in enumerate(funcs):
ax.plot(sizes, times[:, j], label=f.__name__)
ax.set_xlabel("Array size")
ax.set_ylabel("Time per function call (s)")
ax.set_xscale("log")
ax.set_yscale("log")
ax.legend()
ax.grid()
fig.tight_layout()
return fig, ax
#%%
def arg_gen(n):
return [np.random.randint(-100, 100, (n,)), 5]
#%%
def f_OP_loopy(data, length):
x = np.zeros_like(data).astype('float32')
for i in range(length-1, data.shape[0]):
x[i] = np.corrcoef(data[i - length + 1:i+1], np.arange(length))[0, 1]
return x
def f_PH_numpy(data, length):
swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))
cc = vec_corrcoef(swindow, np.arange(length))
x = np.zeros(data.shape)
x[-len(cc):] = cc
return x
def f_RA_recur(data, length):
return np.concatenate((
np.zeros([length,]),
rolling_correlation_recurse(data, 0, length)
))
def f_RA_numpy(data, length):
n = len(data)
cc = np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1]
x = np.zeros(data.shape)
x[-len(cc):] = cc
return x
#%%
def rolling_correlation_recurse(data, i, length) :
assert i+length < data.size
left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
if i+length+1 == data.size :
return left
right = rolling_correlation_recurse(data, i+1, length)
return np.concatenate((left, right))
def vec_corrcoef(X, y, axis=1):
Xm = np.mean(X, axis=axis, keepdims=True)
ym = np.mean(y)
n = np.sum((X - Xm) * (y - ym), axis=axis)
d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
return n / d
#%%
if __name__ == "__main__":
#%% Set up sim
sizes = [5, 10, 50, 100, 500, 1000, 5000, 10_000] #, 50_000, 100_000]
funcs = [f_OP_loopy, #f_RA_recur,
f_PH_numpy, f_RA_numpy]
#%% Run timing
time_fcalls = np.zeros((len(sizes), len(funcs))) * np.nan
time_fcalls = time_funcs(funcs, sizes, arg_gen)
fig, ax = plot_times(time_fcalls, funcs)
ax.set_xlabel(f"Input size")
plt.show()
input("Enter x to exit")
询问,你就会收到。这是使用递归的解决方案:
import numpy as np
data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
length = 5
def rolling_correlation_recurse(data, i, length) :
assert i+length < data.size
left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
if i+length+1 == data.size :
return left
right = rolling_correlation_recurse(data, i+1, length)
return np.concatenate((left, right))
def rolling_correlation(data, length) :
return np.concatenate((
np.zeros([length,]),
rolling_correlation_recurse(data, 0, length)
))
print(rolling_correlation(data, length))
编辑:这也是一个 numpy 解决方案:
n = len(data)
print(np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1])
@pho 的回答非常好,但在我的用例中性能仍然不能令人满意。主要问题是滑动窗口平方和的计算仍然是针对每个窗口单独进行的。计算下一个滑动窗口的平方和时。本能地,人们可以简单地减去最早的元素并添加最新的元素。
因此我尝试寻找滑动窗口运算符,并从这个答案中获得灵感。粗略的想法是扩展皮尔逊相关系数公式并使用高效的移动窗口运算符来代替求和。
分母中,y的标准差只计算一次。 x的滑动窗口标准差的计算可以重写为:
经过这些转换,滑动窗口计算分为三项:X和y的卷积、X的平方均值和x的均值。通过
np.correlate(X,y)
可以高效地完成卷积。另外两个可以使用scipy.ndimage.filters.uniform_filter
来完成,请参考这个问题中的答案。
生成的代码如下所示:
from scipy.ndimage.filters import uniform_filter1d
import numpy as np
def window_corrcoef(arr1, arr2):
length = len(arr2) # window size
# Calculate the slicing index
start_slice = length // 2
end_slice = -(length - start_slice - 1)
if end_slice == 0:
end_slice = None
# sliding window mean and squared mean of arr1
c1 = uniform_filter1d(arr1, length, mode='constant')[start_slice:end_slice]
c2 = uniform_filter1d(arr1 * arr1, length, mode='constant')[start_slice:end_slice]
Xstd = np.sqrt(c2 - c1 * c1)
# mean and std of arr2 will only be calculated once
n = np.correlate(arr1, arr2) / length - c1 * np.mean(arr2)
d = Xstd * np.std(arr2)
return n / d
代码可能并不完美,因为我假设
arr2
的长度应该小于 arr1
。函数 uniform_filter1d
将输出与输入大小相同的数组。因此需要切片。切片代码同时处理 arr2
的奇数和偶数大小,但有点混乱。也许有更聪明的方法可以完成。
为了进行性能比较,我从 @pho 窃取了代码。我的方法被标记为
f_sj_scipy
。实际上,在我的用例中,arr1 的长度约为 60k,arr2 的长度约为 20k。差异可能高达 10 倍。