如何使用numpy计算向量滚动窗口上的相关系数?

问题描述 投票:0回答:3

我能够使用循环计算一维数组的滚动相关系数(针对

[0, 1, 2, 3, 4]
的数据)。 我正在寻找使用
numpy
(不是
pandas
的更智能解决方案。 这是我当前的代码:

import numpy as np
data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
x = np.zeros_like(data).astype('float32')
length = 5
for i in range(length, data.shape[0]):
    x[i] = np.corrcoef(data[i - length:i], np.arange(length))[0, 1]
print(x)

x 给出:

    [ 0.     0.     0.     0.     0.     0.607  0.959  0.98   0.328 -0.287
 -0.61  -0.314 -0.18  -0.8   -0.782 -0.847 -0.811 -0.825 -0.869 -0.283
  0.566  0.863  0.643  0.454]

有没有没有循环的解决方案?

python numpy correlation rolling-computation
3个回答
2
投票

使用

numpy.lib.stride_tricks.sliding_window_view
(在 numpy v1.20.0+ 中可用)

swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))

给出了

data
数组的视图,如下所示:

array([[10,  5,  8,  9, 15],
       [ 5,  8,  9, 15, 22],
       [ 8,  9, 15, 22, 26],
       [ 9, 15, 22, 26, 11],
       [15, 22, 26, 11, 15],
       [22, 26, 11, 15, 16],
       [26, 11, 15, 16, 18],
       [11, 15, 16, 18,  7],
       [15, 16, 18,  7,  4],
       [16, 18,  7,  4,  8],
       [18,  7,  4,  8, -2],
       [ 7,  4,  8, -2, -3],
       [ 4,  8, -2, -3, -4],
       [ 8, -2, -3, -4, -6],
       [-2, -3, -4, -6, -2],
       [-3, -4, -6, -2,  0],
       [-4, -6, -2,  0, 10],
       [-6, -2,  0, 10,  0],
       [-2,  0, 10,  0,  5],
       [ 0, 10,  0,  5,  8]])

现在,我们要对该数组的每一行应用相关系数计算。不幸的是,

np.corrcoef
不接受
axis
参数,它将计算应用于 整个 矩阵,并且没有提供对每行/列执行此操作的方法。

但是,两个向量的相关系数的计算非常简单:

linear correlation coefficient

在这里应用:

def vec_corrcoef(X, y, axis=1):
    Xm = np.mean(X, axis=axis, keepdims=True)
    ym = np.mean(y)
    n = np.sum((X - Xm) * (y - ym), axis=axis)
    d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
    return n / d

现在,用我们的数组和

arange
:

调用这个函数
cc = vec_corrcoef(swindow, np.arange(length))

给出了期望的结果:

array([ 0.60697698,  0.95894955,  0.98      ,  0.3279521 , -0.28709766,
       -0.61035663, -0.31390158, -0.17995394, -0.80041656, -0.78192905,
       -0.84702587, -0.81091772, -0.82464375, -0.86892667, -0.28347335,
        0.56568542,  0.86304424,  0.64326752,  0.45374261,  0.38135638])

要获取

x
,只需设置正确大小的
zeros
数组的适当索引即可。

注意:我认为你的

x
应该包含从
4
索引开始的非零值(因为滑动窗口已满),而不是从索引
5
开始。

x = np.zeros(data.shape)
x[-len(cc):] = cc

如果您确定您的值应从索引

5
开始,那么您可以执行以下操作:

x = np.zeros(data.shape)
x[length:] = cc[:-1] # Ignore the last value in cc

将原始方法的运行时间与此处答案中建议的方法进行比较:

  • f_OP_loopy
    是你的方法,它使用循环实现滑动窗口
  • f_PH_numpy
    是我的方法,它使用
    sliding_window_view
    和向量化函数来逐行计算向量相关系数
  • f_RA_numpy
    Rontogiannis 的方法,它平铺
    arange
    ,计算整个矩阵的相关系数,并且仅选择最后一列的前
    len(data) - length
  • f_RA_recur
    是 Rontogiannis 的递归方法,但我没有计时,因为它错过了最后一个相关系数。

Timing plot

  1. 毫不奇怪,纯 numpy 解决方案比循环方法更快。
  2. 我的 numpy 解决方案计算行相关系数,比下面的 Rontogiannis 所示的解决方案要快,因为平铺向量输入和计算 整个矩阵的相关性涉及额外的工作,只是为了丢弃我的方法避免了不需要的元素。
  3. 随着输入数据大小的增加,Rontogiannis 方法中的这种“额外工作”增加得太多,以至于其运行时间甚至比循环方法更糟糕!我不确定这个额外时间是在
    np.corrcoef
    计算中还是在
    np.tile
    操作。

注意:该图是在我的 2.2GHz i7 Macbook Air、8GB RAM、Python 3.10.7 和 numpy 1.23.3 上获得的。在 Google Colab 上也得到了类似的结果

如果您对计时代码感兴趣,这里是:

import timeit
import numpy as np
from matplotlib import pyplot as plt

def time_funcs(funcs, sizes, arg_gen, N=20):
    times = np.zeros((len(sizes), len(funcs)))
    gdict = globals().copy()
    for i, s in enumerate(sizes):
        args = arg_gen(s)
        print(args)
        for j, f in enumerate(funcs):
            gdict.update(locals())
            try:
                times[i, j] = timeit.timeit("f(*args)", globals=gdict, number=N) / N
                print(f"{i}/{len(sizes)}, {j}/{len(funcs)}, {times[i, j]}")
            except ValueError:
                print(f"ERROR in {f}, with args=", *args)
                
            
    return times

def plot_times(times, funcs):
    fig, ax = plt.subplots()
    for j, f in enumerate(funcs):
        ax.plot(sizes, times[:, j], label=f.__name__)
    
    
    ax.set_xlabel("Array size")
    ax.set_ylabel("Time per function call (s)")
    ax.set_xscale("log")
    ax.set_yscale("log")
    ax.legend()
    ax.grid()
    fig.tight_layout()
    return fig, ax

#%%
def arg_gen(n):
    return [np.random.randint(-100, 100, (n,)), 5]
    
#%%
def f_OP_loopy(data, length):
    x = np.zeros_like(data).astype('float32')
    for i in range(length-1, data.shape[0]):
        x[i] = np.corrcoef(data[i - length + 1:i+1], np.arange(length))[0, 1]
    return x
    

def f_PH_numpy(data, length):    
    swindow = np.lib.stride_tricks.sliding_window_view(data, (length,))
    cc = vec_corrcoef(swindow, np.arange(length))
    x = np.zeros(data.shape)
    x[-len(cc):] = cc
    return x

def f_RA_recur(data, length):
    return np.concatenate((
        np.zeros([length,]),
        rolling_correlation_recurse(data, 0, length)
    ))

def f_RA_numpy(data, length):
    n = len(data)
    cc = np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1]
    x = np.zeros(data.shape)
    x[-len(cc):] = cc
    return x
#%%

def rolling_correlation_recurse(data, i, length) :
    assert i+length < data.size
    left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
    if i+length+1 == data.size :
        return left
    right = rolling_correlation_recurse(data, i+1, length)
    return np.concatenate((left, right))

def vec_corrcoef(X, y, axis=1):
    Xm = np.mean(X, axis=axis, keepdims=True)
    ym = np.mean(y)
    n = np.sum((X - Xm) * (y - ym), axis=axis)
    d = np.sqrt(np.sum((X - Xm)**2, axis=axis) * np.sum((y - ym)**2))
    return n / d

#%% 
if __name__ == "__main__":
    #%% Set up sim
    sizes = [5, 10, 50, 100, 500, 1000, 5000, 10_000] #, 50_000, 100_000]
    funcs = [f_OP_loopy, #f_RA_recur,
             f_PH_numpy, f_RA_numpy]
    
    
    #%% Run timing
    time_fcalls = np.zeros((len(sizes), len(funcs))) * np.nan
    time_fcalls = time_funcs(funcs, sizes, arg_gen)
    
    fig, ax = plot_times(time_fcalls, funcs)
    ax.set_xlabel(f"Input size")

    plt.show()
    input("Enter x to exit")

1
投票

询问,你就会收到。这是使用递归的解决方案:

import numpy as np

data = np.array([10,5,8,9,15,22,26,11,15,16,18,7,4,8,-2,-3,-4,-6,-2,0,10,0,5,8])
length = 5

def rolling_correlation_recurse(data, i, length) :
    assert i+length < data.size
    left = np.array([np.corrcoef(data[i:i+length], np.arange(length))[0, 1]])
    if i+length+1 == data.size :
        return left
    right = rolling_correlation_recurse(data, i+1, length)
    return np.concatenate((left, right))

def rolling_correlation(data, length) :
    return np.concatenate((
        np.zeros([length,]),
        rolling_correlation_recurse(data, 0, length)
    ))

print(rolling_correlation(data, length))

编辑:这也是一个 numpy 解决方案:

n = len(data)
print(np.corrcoef(np.lib.stride_tricks.sliding_window_view(data, length), np.tile(np.arange(length), (n-length+1, 1)))[:n-length+1, -1])

1
投票

@pho 的回答非常好,但在我的用例中性能仍然不能令人满意。主要问题是滑动窗口平方和的计算仍然是针对每个窗口单独进行的。计算下一个滑动窗口的平方和时。本能地,人们可以简单地减去最早的元素并添加最新的元素。

因此我尝试寻找滑动窗口运算符,并从这个答案中获得灵感。粗略的想法是扩展皮尔逊相关系数公式并使用高效的移动窗口运算符来代替求和。

我们先从皮尔逊相关系数的公式开始:

提名人可以重写如下:

分母中,y的标准差只计算一次。 x的滑动窗口标准差的计算可以重写为:

经过这些转换,滑动窗口计算分为三项:X和y的卷积、X的平方均值和x的均值。通过

np.correlate(X,y)
可以高效地完成卷积。另外两个可以使用
scipy.ndimage.filters.uniform_filter
来完成,请参考这个问题中的答案。

生成的代码如下所示:

    from scipy.ndimage.filters import uniform_filter1d
    import numpy as np

    def window_corrcoef(arr1, arr2):
        length = len(arr2) # window size
        # Calculate the slicing index
        start_slice = length // 2
        end_slice = -(length - start_slice - 1)
        if end_slice == 0:
            end_slice = None
        # sliding window mean and squared mean of arr1
        c1 = uniform_filter1d(arr1, length, mode='constant')[start_slice:end_slice]
        c2 = uniform_filter1d(arr1 * arr1, length, mode='constant')[start_slice:end_slice]
        Xstd = np.sqrt(c2 - c1 * c1)
        # mean and std of arr2 will only be calculated once
        n = np.correlate(arr1, arr2) / length - c1 * np.mean(arr2)
        d = Xstd * np.std(arr2)
        return n / d

代码可能并不完美,因为我假设

arr2
的长度应该小于
arr1
。函数
uniform_filter1d
将输出与输入大小相同的数组。因此需要切片。切片代码同时处理
arr2
的奇数和偶数大小,但有点混乱。也许有更聪明的方法可以完成。

为了进行性能比较,我从 @pho 窃取了代码。我的方法被标记为

f_sj_scipy
。实际上,在我的用例中,arr1 的长度约为 60k,arr2 的长度约为 20k。差异可能高达 10 倍。

© www.soinside.com 2019 - 2024. All rights reserved.