使用互相关查找两个信号的时移

问题描述 投票:0回答:4

我有两个彼此相关的信号,并且由两个不同的测量设备同时捕获。 由于两个测量时间不同步,因此我想计算它们之间有一个小的时间延迟。此外,我需要知道哪个信号是主导信号。

可以假设以下情况:

  • 没有或只有很少的噪音
  • 算法的速度不是问题,只有准确性和鲁棒性
  • 以高采样率(>10 kHz)捕获信号几秒钟
  • 预计延迟时间为 < 0.5s

我想为此目的使用互相关。 非常感谢任何关于如何在 Python 中实现这一点的建议。

请告诉我是否应该提供更多信息以便找到最合适的算法。

python python-2.7 signal-processing lag cross-correlation
4个回答
22
投票

一种流行的方法:时移是最大互相关系数对应的滞后。以下是它的工作原理:

import matplotlib.pyplot as plt
from scipy import signal
import numpy as np


def lag_finder(y1, y2, sr):
    n = len(y1)

    corr = signal.correlate(y2, y1, mode='same') / np.sqrt(signal.correlate(y1, y1, mode='same')[int(n/2)] * signal.correlate(y2, y2, mode='same')[int(n/2)])

    delay_arr = np.linspace(-0.5*n/sr, 0.5*n/sr, n)
    delay = delay_arr[np.argmax(corr)]
    print('y2 is ' + str(delay) + ' behind y1')

    plt.figure()
    plt.plot(delay_arr, corr)
    plt.title('Lag: ' + str(np.round(delay, 3)) + ' s')
    plt.xlabel('Lag')
    plt.ylabel('Correlation coeff')
    plt.show()

# Sine sample with some noise and copy to y1 and y2 with a 1-second lag
sr = 1024
y = np.linspace(0, 2*np.pi, sr)
y = np.tile(np.sin(y), 5)
y += np.random.normal(0, 5, y.shape)
y1 = y[sr:4*sr]
y2 = y[:3*sr]

lag_finder(y1, y2, sr)

在有噪声信号的情况下,通常首先应用带通滤波器。对于谐波噪声,可以通过识别和消除频谱中存在的频率尖峰来消除它们。


2
投票

Numpy 具有满足您需求的函数

correlate
https://docs.scipy.org/doc/numpy/reference/ generated/numpy.correlate.html


1
投票

Scipy 有一个有用的函数,称为 correlation_lags,它使用其他答案提到的底层 correlate 函数来查找时间延迟。该页面底部显示的示例很有用:

from scipy import signal
from numpy.random import default_rng

rng = default_rng()
x = rng.standard_normal(1000)
y = np.concatenate([rng.standard_normal(100), x])
correlation = signal.correlate(x, y, mode="full")
lags = signal.correlation_lags(x.size, y.size, mode="full")
lag = lags[np.argmax(correlation)]

那么

lag
将是
-100


0
投票

为了补充Reveille上面的答案(我重现了他的算法),我想指出一些预处理输入信号的想法。 由于似乎没有万能的(周期持续时间、分辨率、偏移、噪声、信号类型……),您可以使用它。 在我的示例中,窗函数的应用改进了检测到的相移(在离散化的分辨率内)。

import numpy as np
from scipy import signal
import matplotlib.pyplot as plt

r2d = 180.0/np.pi   # conversion factor RAD-to-DEG
delta_phi_true = 50.0/r2d

def detect_phase_shift(t, x, y):
    '''detect phase shift between two signals from cross correlation maximum'''
    N = len(t)
    L = t[-1] - t[0]
    
    cc = signal.correlate(x, y, mode="same")
    i_max = np.argmax(cc)
    phi_shift = np.linspace(-0.5*L, 0.5*L , N)
    delta_phi = phi_shift[i_max]

    print("true delta phi = {} DEG".format(delta_phi_true*r2d))
    print("detected delta phi = {} DEG".format(delta_phi*r2d))
    print("error = {} DEG    resolution for comparison dphi = {} DEG".format((delta_phi-delta_phi_true)*r2d, dphi*r2d))
    print("ratio = {}".format(delta_phi/delta_phi_true))
    return delta_phi


L = np.pi*10+2     # interval length [RAD], for generality not multiple period
N = 1001   # interval division, odd number is better (center is integer)
noise_intensity = 0.0
X = 0.5   # amplitude of first signal..
Y = 2.0   # ..and second signal

phi = np.linspace(0, L, N)
dphi = phi[1] - phi[0]

'''generate signals'''
nx = noise_intensity*np.random.randn(N)*np.sqrt(dphi)   
ny = noise_intensity*np.random.randn(N)*np.sqrt(dphi)
x_raw = X*np.sin(phi) + nx
y_raw = Y*np.sin(phi+delta_phi_true) + ny

'''preprocessing signals'''
x = x_raw.copy() 
y = y_raw.copy()
window = signal.windows.hann(N)   # Hanning window 
#x -= np.mean(x)   # zero mean
#y -= np.mean(y)   # zero mean
#x /= np.std(x)    # scale
#y /= np.std(y)    # scale
x *= window       # reduce effect of finite length 
y *= window       # reduce effect of finite length 

print(" -- using raw data -- ")
delta_phi_raw = detect_phase_shift(phi, x_raw, y_raw)

print(" -- using preprocessed data -- ")
delta_phi_preprocessed = detect_phase_shift(phi, x, y)

没有噪声(确定性),输出为

 -- using raw data -- 
true delta phi = 50.0 DEG
detected delta phi = 47.864788975654 DEG
...
 -- using preprocessed data -- 
true delta phi = 50.0 DEG
detected delta phi = 49.77938053468019 DEG
...
© www.soinside.com 2019 - 2024. All rights reserved.