通用 Goertzel 算法比 FFT 更好的峰值检测:用真实数据偏移频率?

问题描述 投票:0回答:1

我已经从 Matlab 代码中翻译了 Python 中的 Generalized Goertzel 技术的算法,可以在here找到。

我在实际数据上使用它时遇到了麻烦,并且仅在实际数据上使用它:生成具有 5 个正弦分量的测试“合成”信号,Goertzel 返回正确的频率,显然比 FFT 具有更好的精度;两种技术都是一致的。

但是,如果我提供 N 个样本的市场数据,FFT 将给出预期的最低频率 f = 1/N; Goertzel 返回所有高于 1 的频率。数据的时间范围是 1 小时,但数据没有时间戳标记,也可能是秒,所以我的期望是计算频率变换的两种方法应该返回,除了不同的精度,频域上的相同谐波。

为什么我在一种方法中获得最低频率,但使用另一种方法和真实数据时却大于 1?

import numpy as np

def goertzel_general_shortened(x, indvec, maxes_tollerance = 100):
    # Check input arguments
    if len(indvec) < 1:
        raise ValueError('Not enough input arguments')
    if not isinstance(x, np.ndarray) or x.size == 0:
        raise ValueError('X must be a nonempty numpy array')
    if not isinstance(indvec, np.ndarray) or indvec.size == 0:
        raise ValueError('INDVEC must be a nonempty numpy array')
    if np.iscomplex(indvec).any():
        raise ValueError('INDVEC must contain real numbers')
    
    lx = len(x)
    x = x.reshape(lx, 1)  # forcing x to be a column vector
    
    # Initialization
    no_freq = len(indvec)
    y = np.zeros((no_freq,), dtype=complex)
    
    # Computation via second-order system
    for cnt_freq in range(no_freq):
        # Precompute constants
        pik_term = 2 * np.pi * indvec[cnt_freq] / lx
        cos_pik_term2 = 2 * np.cos(pik_term)
        cc = np.exp(-1j * pik_term)  # complex constant
        
        # State variables
        s0 = 0
        s1 = 0
        s2 = 0
        
        # Main loop
        for ind in range(lx - 1):
            s0 = x[ind] + cos_pik_term2 * s1 - s2
            s2 = s1
            s1 = s0
        
        # Final computations
        s0 = x[lx - 1] + cos_pik_term2 * s1 - s2
        y[cnt_freq] = s0 - s1 * cc
        
        # Complex multiplication substituting the last iteration
        # and correcting the phase for potentially non-integer valued
        # frequencies at the same time
        y[cnt_freq] = y[cnt_freq] * np.exp(-1j * pik_term * (lx - 1))
    
    return y

这里是综合测试 5 分量信号的 FFT 和 Goertzel 变换图表

这里是戈策尔那个

原始频率是

signal_frequencies = [30.5, 47.4, 80.8, 120.7, 133]

相反,如果我尝试下载市场数据

data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")

并尝试转换 SPY 的数据['Close'],这是我通过 N = 800 个样本的 FFT 变换得到的结果

以及前 2 个组件上的重建信号(不太好)

这就是我通过 Goertzel 变换得到的结果

请注意,FFT 的第一个峰值低于 0.005,而 Goertzel 的第一个峰值高于 1。

这是我在 SPY 数据上测试 FFT 的方式

import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

def analyze_and_plot(data, num_samples, start_date, num_harmonics):

    num_harmonics = num_harmonics *1
    # Seleziona i dati nell'intervallo specificato
    original_data = data
    data = data[data.index >= start_date]

    # Estrai i campioni desiderati
    data = data.head(num_samples)

    # Calcola la FFT dei valori "Close"
    fft_result = np.fft.fft(data["Close"].values)
    frequency_range = np.fft.fftfreq(len(fft_result))


    print("Frequencies: ")
    print(frequency_range)
    print("N Frequencies: ")
    print(len(frequency_range))


    print("First frequencies magnitude: ")
    print(np.abs(fft_result[0:num_harmonics]))


    # Trova le armoniche dominanti
    # top_harmonics = np.argsort(np.abs(fft_result))[::-1][:num_harmonics]
    top_harmonics = np.argsort(np.abs(fft_result[0:400]))[::-1][1:(num_harmonics + 1)] # skip first one

    print("Top harmonics: ")
    print(top_harmonics)

    # top_harmonics = [1, 4]#, 8, 5, 9]



    # Creazione del grafico per lo spettro
    spectrum_trace = go.Scatter(x=frequency_range, y=np.abs(fft_result), mode='lines', name='FFT Spectrum')
    fig_spectrum = go.Figure(spectrum_trace)
    fig_spectrum.update_layout(title="FFT Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))

    # Calcola la ricostruzione basata sulle prime N armoniche
    reconstructed_signal = np.zeros(len(data))

    time = np.linspace(0, num_samples, num_samples, endpoint=False)
    # print('time')
    # print(time)
    for harmonic_index in top_harmonics[:num_harmonics]:
      amplitude = np.abs(fft_result[harmonic_index]) #.real
      phase = np.angle(fft_result[harmonic_index])
      frequency = frequency_range[harmonic_index]
      reconstructed_signal += amplitude * np.cos(2 * np.pi * frequency * time + phase)


    # print('first reconstructed_signal len')
    # print(len(reconstructed_signal))

    zeros = np.zeros(len(original_data) - 2*len(data))
    reconstructed_signal = np.concatenate((reconstructed_signal, reconstructed_signal), axis = 0)

    # print('second reconstructed_signal len')
    # print(len(reconstructed_signal))

    reconstructed_signal = np.concatenate((reconstructed_signal, zeros), axis = 0)

    original_data['reconstructed_signal'] = reconstructed_signal


    # print('reconstructed_signal len')
    # print(len(reconstructed_signal))

    # print('original_data len')
    # print(len(original_data))

    # print('reconstructed_signal[300:320]')
    # print(reconstructed_signal[290:320])

    # print('original_data[300:320]')
    # print(original_data[290:320][['Close', 'reconstructed_signal']])


    # reconstructed_signal = np.fft.ifft(fft_result[top_harmonics[:num_harmonics]])

    # print('reconstructed_signal')
    # print(reconstructed_signal)

    # Converte i valori complessi in valori reali per la ricostruzione
    # reconstructed_signal_real = reconstructed_signal.real

    # Creazione del secondo grafico con due subplot
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))

    # Aggiungi il grafico di "Close" originale al primo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)


    # Aggiungi il grafico della ricostruzione al secondo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)

    # Aggiorna il layout del secondo grafico
    fig.update_xaxes(title_text="Time", row=2, col=1)
    fig.update_yaxes(title_text="Value", row=1, col=1)
    fig.update_yaxes(title_text="Value", row=2, col=1)

    # Aggiorna il layout generale
    fig.update_layout(title="Close Analysis and Reconstruction")

    # Visualizza il grafico dello spettro
    fig_spectrum.show()

    # fig.update_layout(xaxis = dict(type="category"))

    # Aggiorna il layout dell'asse X per includere tutti i dati
    # fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)

    fig.update_xaxes(type="category", row=1, col=1)
    fig.update_xaxes(type="category", row=2, col=1)

    # Visualizza il secondo grafico con i subplot
    fig.show()

# Esempio di utilizzo
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)

以及Goertzel上SPY数据的测试

import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

def analyze_and_plot(data, num_samples, start_date, num_harmonics):


    # Seleziona i dati nell'intervallo specificato
    original_data = data
    data = data[data.index >= start_date]

    # Estrai i campioni desiderati
    data = data.head(num_samples)

    # Frequenze desiderate
    frequency_range = np.arange(0, 20, 0.001) 


    # Calcola lo spettro delle frequenze utilizzando la funzione Goertzel
    transform = goertzel_general_shortened(data['Close'].values, frequency_range)

    harmonics_amplitudes = np.abs(transform)

    frequency_range = frequency_range  
    # Creazione del grafico per lo spettro
    spectrum_trace = go.Scatter(x=frequency_range, y=harmonics_amplitudes, mode='lines', name='FFT Spectrum')
    fig_spectrum = go.Figure(spectrum_trace)
    fig_spectrum.update_layout(title="Frequency Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))

    # Visualizza il grafico dello spettro
    fig_spectrum.show()


    peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
    peak_frequencies = frequency_range[peaks_indexes] 
    peak_amplitudes = harmonics_amplitudes[peaks_indexes]

    print('peaks_indexes')
    print(peaks_indexes[0:30])
    print('peak_frequencies')
    print(peak_frequencies[0:30])
    print('peak_amplitudes')
    print(peak_amplitudes[0:30])

    lower_freq_sort_peak_indexes = np.sort(peaks_indexes)[0:num_harmonics] # lower indexes <--> lower frequencies


    higher_amplitudes_sort_peak_indexes = peaks_indexes[np.argsort(harmonics_amplitudes[peaks_indexes])[::-1]][0:num_harmonics]

    print('higher_amplitudes_sort_peak_indexes')
    print(higher_amplitudes_sort_peak_indexes[0:10])



    # used_indexes = lower_freq_sort_peak_indexes
    used_indexes = higher_amplitudes_sort_peak_indexes

    # Creazione del segnale ricostruito utilizzando i picchi
    time = np.linspace(0, num_samples, num_samples, endpoint=False) 
    reconstructed_signal = np.zeros(len(time), dtype=float)

    print('num_samples')
    print(num_samples)
    print('time[0:20]')
    print(time[0:20])


    print('reconstructed_signal')
    print(reconstructed_signal[0:10])

    for index in used_indexes:

        phase = np.angle(transform[index])  
        amplitude = np.abs(transform[index])
        frequency = frequency_range[index]
        print('phase')
        print(phase)
        print('amplitude')
        print(amplitude)
        print('frequency')
        print(frequency)
        reconstructed_signal += amplitude * np.sin(2 * np.pi * frequency * time + phase)


    # Estrai la parte reale del segnale ricostruito
    reconstructed_signal_real = reconstructed_signal

    print('reconstructed_signal[1]')
    print(reconstructed_signal[1])

    print('reconstructed_signal.shape')
    print(reconstructed_signal.shape)

    zeros = np.zeros(len(original_data) - 2*num_samples)
    reconstructed_signal_real = np.concatenate((reconstructed_signal_real, reconstructed_signal_real), axis = 0)
    print('reconstructed_signal_real.shape')
    print(reconstructed_signal_real.shape)
    reconstructed_signal_real = np.concatenate((reconstructed_signal_real, zeros), axis = 0)
    print('reconstructed_signal_real.shape')
    print(reconstructed_signal_real.shape)
    original_data['reconstructed_signal'] = reconstructed_signal_real



    # Creazione del secondo grafico con due subplot
    fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))

    # Aggiungi il grafico di "Close" originale al primo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)


    # Aggiungi il grafico della ricostruzione al secondo subplot
    fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)

    # Aggiorna il layout del secondo grafico
    fig.update_xaxes(title_text="Time", row=2, col=1)
    fig.update_yaxes(title_text="Value", row=1, col=1)
    fig.update_yaxes(title_text="Value", row=2, col=1)

    # Aggiorna il layout generale
    fig.update_layout(title="Close Analysis and Reconstruction")


    # fig.update_layout(xaxis = dict(type="category"))

    # Aggiorna il layout dell'asse X per includere tutti i dati
    # fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)

    fig.update_xaxes(type="category", row=1, col=1)
    fig.update_xaxes(type="category", row=2, col=1)

    # Visualizza il secondo grafico con i subplot
    fig.show()

# Esempio di utilizzo

analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)
python fft frequency-analysis goertzel-algorithm
1个回答
0
投票

标准化 SPY 数据:

import numpy as np
def normalize_values(values, max_value=1.0):
    norm_values = (values - np.mean(values)) / np.var(values)**0.5
    norm_values += np.abs(np.min(norm_values))
    norm_values *= (max_value / np.max(norm_values))
    return norm_values
© www.soinside.com 2019 - 2024. All rights reserved.