我已经从 Matlab 代码中翻译了 Python 中的 Generalized Goertzel 技术的算法,可以在here找到。
我在实际数据上使用它时遇到了麻烦,并且仅在实际数据上使用它:生成具有 5 个正弦分量的测试“合成”信号,Goertzel 返回正确的频率,显然比 FFT 具有更好的精度;两种技术都是一致的。
但是,如果我提供 N 个样本的市场数据,FFT 将给出预期的最低频率 f = 1/N; Goertzel 返回所有高于 1 的频率。数据的时间范围是 1 小时,但数据没有时间戳标记,也可能是秒,所以我的期望是计算频率变换的两种方法应该返回,除了不同的精度,频域上的相同谐波。
为什么我在一种方法中获得最低频率,但使用另一种方法和真实数据时却大于 1?
import numpy as np
def goertzel_general_shortened(x, indvec, maxes_tollerance = 100):
# Check input arguments
if len(indvec) < 1:
raise ValueError('Not enough input arguments')
if not isinstance(x, np.ndarray) or x.size == 0:
raise ValueError('X must be a nonempty numpy array')
if not isinstance(indvec, np.ndarray) or indvec.size == 0:
raise ValueError('INDVEC must be a nonempty numpy array')
if np.iscomplex(indvec).any():
raise ValueError('INDVEC must contain real numbers')
lx = len(x)
x = x.reshape(lx, 1) # forcing x to be a column vector
# Initialization
no_freq = len(indvec)
y = np.zeros((no_freq,), dtype=complex)
# Computation via second-order system
for cnt_freq in range(no_freq):
# Precompute constants
pik_term = 2 * np.pi * indvec[cnt_freq] / lx
cos_pik_term2 = 2 * np.cos(pik_term)
cc = np.exp(-1j * pik_term) # complex constant
# State variables
s0 = 0
s1 = 0
s2 = 0
# Main loop
for ind in range(lx - 1):
s0 = x[ind] + cos_pik_term2 * s1 - s2
s2 = s1
s1 = s0
# Final computations
s0 = x[lx - 1] + cos_pik_term2 * s1 - s2
y[cnt_freq] = s0 - s1 * cc
# Complex multiplication substituting the last iteration
# and correcting the phase for potentially non-integer valued
# frequencies at the same time
y[cnt_freq] = y[cnt_freq] * np.exp(-1j * pik_term * (lx - 1))
return y
这里是综合测试 5 分量信号的 FFT 和 Goertzel 变换图表
这里是戈策尔那个
原始频率是
signal_frequencies = [30.5, 47.4, 80.8, 120.7, 133]
相反,如果我尝试下载市场数据
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
并尝试转换 SPY 的数据['Close'],这是我通过 N = 800 个样本的 FFT 变换得到的结果
以及前 2 个组件上的重建信号(不太好)
这就是我通过 Goertzel 变换得到的结果
请注意,FFT 的第一个峰值低于 0.005,而 Goertzel 的第一个峰值高于 1。
这是我在 SPY 数据上测试 FFT 的方式
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots
def analyze_and_plot(data, num_samples, start_date, num_harmonics):
num_harmonics = num_harmonics *1
# Seleziona i dati nell'intervallo specificato
original_data = data
data = data[data.index >= start_date]
# Estrai i campioni desiderati
data = data.head(num_samples)
# Calcola la FFT dei valori "Close"
fft_result = np.fft.fft(data["Close"].values)
frequency_range = np.fft.fftfreq(len(fft_result))
print("Frequencies: ")
print(frequency_range)
print("N Frequencies: ")
print(len(frequency_range))
print("First frequencies magnitude: ")
print(np.abs(fft_result[0:num_harmonics]))
# Trova le armoniche dominanti
# top_harmonics = np.argsort(np.abs(fft_result))[::-1][:num_harmonics]
top_harmonics = np.argsort(np.abs(fft_result[0:400]))[::-1][1:(num_harmonics + 1)] # skip first one
print("Top harmonics: ")
print(top_harmonics)
# top_harmonics = [1, 4]#, 8, 5, 9]
# Creazione del grafico per lo spettro
spectrum_trace = go.Scatter(x=frequency_range, y=np.abs(fft_result), mode='lines', name='FFT Spectrum')
fig_spectrum = go.Figure(spectrum_trace)
fig_spectrum.update_layout(title="FFT Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))
# Calcola la ricostruzione basata sulle prime N armoniche
reconstructed_signal = np.zeros(len(data))
time = np.linspace(0, num_samples, num_samples, endpoint=False)
# print('time')
# print(time)
for harmonic_index in top_harmonics[:num_harmonics]:
amplitude = np.abs(fft_result[harmonic_index]) #.real
phase = np.angle(fft_result[harmonic_index])
frequency = frequency_range[harmonic_index]
reconstructed_signal += amplitude * np.cos(2 * np.pi * frequency * time + phase)
# print('first reconstructed_signal len')
# print(len(reconstructed_signal))
zeros = np.zeros(len(original_data) - 2*len(data))
reconstructed_signal = np.concatenate((reconstructed_signal, reconstructed_signal), axis = 0)
# print('second reconstructed_signal len')
# print(len(reconstructed_signal))
reconstructed_signal = np.concatenate((reconstructed_signal, zeros), axis = 0)
original_data['reconstructed_signal'] = reconstructed_signal
# print('reconstructed_signal len')
# print(len(reconstructed_signal))
# print('original_data len')
# print(len(original_data))
# print('reconstructed_signal[300:320]')
# print(reconstructed_signal[290:320])
# print('original_data[300:320]')
# print(original_data[290:320][['Close', 'reconstructed_signal']])
# reconstructed_signal = np.fft.ifft(fft_result[top_harmonics[:num_harmonics]])
# print('reconstructed_signal')
# print(reconstructed_signal)
# Converte i valori complessi in valori reali per la ricostruzione
# reconstructed_signal_real = reconstructed_signal.real
# Creazione del secondo grafico con due subplot
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))
# Aggiungi il grafico di "Close" originale al primo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)
# Aggiungi il grafico della ricostruzione al secondo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)
# Aggiorna il layout del secondo grafico
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_yaxes(title_text="Value", row=2, col=1)
# Aggiorna il layout generale
fig.update_layout(title="Close Analysis and Reconstruction")
# Visualizza il grafico dello spettro
fig_spectrum.show()
# fig.update_layout(xaxis = dict(type="category"))
# Aggiorna il layout dell'asse X per includere tutti i dati
# fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)
fig.update_xaxes(type="category", row=1, col=1)
fig.update_xaxes(type="category", row=2, col=1)
# Visualizza il secondo grafico con i subplot
fig.show()
# Esempio di utilizzo
data = yf.download("SPY", start="2022-01-01", end="2023-12-31", interval="1h")
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)
以及Goertzel上SPY数据的测试
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots
def analyze_and_plot(data, num_samples, start_date, num_harmonics):
# Seleziona i dati nell'intervallo specificato
original_data = data
data = data[data.index >= start_date]
# Estrai i campioni desiderati
data = data.head(num_samples)
# Frequenze desiderate
frequency_range = np.arange(0, 20, 0.001)
# Calcola lo spettro delle frequenze utilizzando la funzione Goertzel
transform = goertzel_general_shortened(data['Close'].values, frequency_range)
harmonics_amplitudes = np.abs(transform)
frequency_range = frequency_range
# Creazione del grafico per lo spettro
spectrum_trace = go.Scatter(x=frequency_range, y=harmonics_amplitudes, mode='lines', name='FFT Spectrum')
fig_spectrum = go.Figure(spectrum_trace)
fig_spectrum.update_layout(title="Frequency Spectrum", xaxis=dict(title="Frequency"), yaxis=dict(title="Magnitude"))
# Visualizza il grafico dello spettro
fig_spectrum.show()
peaks_indexes = argrelmax(harmonics_amplitudes, order = 10)[0] # find indexes of peaks
peak_frequencies = frequency_range[peaks_indexes]
peak_amplitudes = harmonics_amplitudes[peaks_indexes]
print('peaks_indexes')
print(peaks_indexes[0:30])
print('peak_frequencies')
print(peak_frequencies[0:30])
print('peak_amplitudes')
print(peak_amplitudes[0:30])
lower_freq_sort_peak_indexes = np.sort(peaks_indexes)[0:num_harmonics] # lower indexes <--> lower frequencies
higher_amplitudes_sort_peak_indexes = peaks_indexes[np.argsort(harmonics_amplitudes[peaks_indexes])[::-1]][0:num_harmonics]
print('higher_amplitudes_sort_peak_indexes')
print(higher_amplitudes_sort_peak_indexes[0:10])
# used_indexes = lower_freq_sort_peak_indexes
used_indexes = higher_amplitudes_sort_peak_indexes
# Creazione del segnale ricostruito utilizzando i picchi
time = np.linspace(0, num_samples, num_samples, endpoint=False)
reconstructed_signal = np.zeros(len(time), dtype=float)
print('num_samples')
print(num_samples)
print('time[0:20]')
print(time[0:20])
print('reconstructed_signal')
print(reconstructed_signal[0:10])
for index in used_indexes:
phase = np.angle(transform[index])
amplitude = np.abs(transform[index])
frequency = frequency_range[index]
print('phase')
print(phase)
print('amplitude')
print(amplitude)
print('frequency')
print(frequency)
reconstructed_signal += amplitude * np.sin(2 * np.pi * frequency * time + phase)
# Estrai la parte reale del segnale ricostruito
reconstructed_signal_real = reconstructed_signal
print('reconstructed_signal[1]')
print(reconstructed_signal[1])
print('reconstructed_signal.shape')
print(reconstructed_signal.shape)
zeros = np.zeros(len(original_data) - 2*num_samples)
reconstructed_signal_real = np.concatenate((reconstructed_signal_real, reconstructed_signal_real), axis = 0)
print('reconstructed_signal_real.shape')
print(reconstructed_signal_real.shape)
reconstructed_signal_real = np.concatenate((reconstructed_signal_real, zeros), axis = 0)
print('reconstructed_signal_real.shape')
print(reconstructed_signal_real.shape)
original_data['reconstructed_signal'] = reconstructed_signal_real
# Creazione del secondo grafico con due subplot
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.1, subplot_titles=("Original Close", "Reconstructed Close"))
# Aggiungi il grafico di "Close" originale al primo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data["Close"], mode="lines", name="Original Close"), row=1, col=1)
# Aggiungi il grafico della ricostruzione al secondo subplot
fig.add_trace(go.Scatter(x=original_data.index, y=original_data['reconstructed_signal'] , mode="lines", name="Reconstructed Close"), row=2, col=1)
# Aggiorna il layout del secondo grafico
fig.update_xaxes(title_text="Time", row=2, col=1)
fig.update_yaxes(title_text="Value", row=1, col=1)
fig.update_yaxes(title_text="Value", row=2, col=1)
# Aggiorna il layout generale
fig.update_layout(title="Close Analysis and Reconstruction")
# fig.update_layout(xaxis = dict(type="category"))
# Aggiorna il layout dell'asse X per includere tutti i dati
# fig.update_xaxes(range=[original_data.index.min(), original_data.index.max()], row=2, col=1)
fig.update_xaxes(type="category", row=1, col=1)
fig.update_xaxes(type="category", row=2, col=1)
# Visualizza il secondo grafico con i subplot
fig.show()
# Esempio di utilizzo
analyze_and_plot(data, num_samples=800, start_date="2023-01-01", num_harmonics=2)
标准化 SPY 数据:
import numpy as np
def normalize_values(values, max_value=1.0):
norm_values = (values - np.mean(values)) / np.var(values)**0.5
norm_values += np.abs(np.min(norm_values))
norm_values *= (max_value / np.max(norm_values))
return norm_values