我正在开展一个项目,开发一种可以检测心电图异常的算法。这是我的大学项目。我有一个很大的数据集,但现在我只处理一个文件,以便练习和理解流程。我正在尝试检测 R 峰值,但是当我绘制图像时,红点位于我的信号上方,因此这意味着它没有正确检测到它们。这对我来说是一条死胡同。我不知道如何解决它。这只是学士学位,所以我对此主题没有太多了解,但我正在尽力而为。
#Import the libraries
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import scipy.io
import os
from concurrent.futures import ThreadPoolExecutor
from scipy.signal import butter, lfilter
# Main directory containing subdirectories with .mat and .hea files
main_directory = r'C:\Users\Alex\Downloads\datasets\training\cpsc_2018\g1'
# Initialize an empty dictionary to store loaded data
loaded_data = {}
# Walk through all directories and subdirectories
for root, dirs, files in os.walk(main_directory):
for filename in files:
if filename.endswith('.mat') or filename.endswith('.hea'): # Check for both .mat and .hea files
try:
# Load .mat file
if filename.endswith('.mat'):
data = scipy.io.loadmat(os.path.join(root, filename))
# Load .hea file
elif filename.endswith('.hea'):
with open(os.path.join(root, filename), 'r') as f:
data = f.read() # Read the content of .hea file
# Store loaded data in dictionary
loaded_data[os.path.join(root, filename)] = data
except Exception as e:
print(f"Error loading file {filename}: {e}")
def baseline_correction(signal):
# Fit a polynomial to the signal and subtract it
poly_degree = 3 # Choose the degree of the polynomial
poly_coeffs = np.polyfit(np.arange(len(signal)), signal, poly_degree)
baseline = np.polyval(poly_coeffs, np.arange(len(signal)))
corrected_signal = signal - baseline
return corrected_signal
def artifact_removal(signal, threshold=100):
# Remove segments where signal exceeds the threshold
clean_signal = signal.copy()
clean_signal[np.abs(signal) > threshold] = 0
return clean_signal
def normalize_signal(signal):
min_val = np.min(signal)
max_val = np.max(signal)
normalized_signal = (signal - min_val) / (max_val - min_val)
return normalized_signal
def assess_signal_quality(signal, noise_level=10):
# Calculate SNR as the ratio of signal amplitude to noise level
signal_amplitude = np.max(signal) - np.min(signal)
snr = signal_amplitude / noise_level
# Return True if SNR is above a threshold, else False
return snr > 5 # Adjust threshold as needed
def detect_peaks(ecg_measurements,signal_frequency,gain):
filter_lowcut = 0.001
filter_highcut = 15.0
filter_order = 1
integration_window = 30 # Change proportionally when adjusting frequency (in samples).
findpeaks_limit = 0.35
findpeaks_spacing = 100 # Change proportionally when adjusting frequency (in samples).
refractory_period = 240 # Change proportionally when adjusting frequency (in samples).
qrs_peak_filtering_factor = 0.125
noise_peak_filtering_factor = 0.125
qrs_noise_diff_weight = 0.25
# Detection results.
qrs_peaks_indices = np.array([], dtype=int)
noise_peaks_indices = np.array([], dtype=int)
# Measurements filtering - 0-15 Hz band pass filter.
filtered_ecg_measurements = bandpass_filter(ecg_measurements, lowcut=filter_lowcut, highcut=filter_highcut, signal_freq=signal_frequency, filter_order=filter_order)
filtered_ecg_measurements[:5] = filtered_ecg_measurements[5]
# Derivative - provides QRS slope information.
differentiated_ecg_measurements = np.ediff1d(filtered_ecg_measurements)
# Squaring - intensifies values received in derivative.
squared_ecg_measurements = differentiated_ecg_measurements ** 2
# Moving-window integration.
integrated_ecg_measurements = np.convolve(squared_ecg_measurements, np.ones(integration_window)/integration_window)
# Fiducial mark - peak detection on integrated measurements.
detected_peaks_indices = findpeaks(data=integrated_ecg_measurements,
limit=findpeaks_limit,
spacing=findpeaks_spacing)
detected_peaks_values = integrated_ecg_measurements[detected_peaks_indices]
return detected_peaks_values,detected_peaks_indices
def findpeaks(data, spacing=1, limit=None):
"""
Janko Slavic peak detection algorithm and implementation.
https://github.com/jankoslavic/py-tools/tree/master/findpeaks
Finds peaks in `data` which are of `spacing` width and >=`limit`.
:param ndarray data: data
:param float spacing: minimum spacing to the next peak (should be 1 or more)
:param float limit: peaks should have value greater or equal
:return array: detected peaks indexes array
"""
len = data.size
x = np.zeros(len + 2 * spacing)
x[:spacing] = data[0] - 1.e-6
x[-spacing:] = data[-1] - 1.e-6
x[spacing:spacing + len] = data
peak_candidate = np.zeros(len)
peak_candidate[:] = True
for s in range(spacing):
start = spacing - s - 1
h_b = x[start: start + len] # before
start = spacing
h_c = x[start: start + len] # central
start = spacing + s + 1
h_a = x[start: start + len] # after
peak_candidate = np.logical_and(peak_candidate, np.logical_and(h_c > h_b, h_c > h_a))
ind = np.argwhere(peak_candidate)
ind = ind.reshape(ind.size)
if limit is not None:
ind = ind[data[ind] > limit]
return ind
def get_12ECG_features(data, header_data):
# Split the first line to get essential recording information
first_line = header_data[0].split()
ptID = first_line[0]
num_leads = int(first_line[1])
sample_Fs = int(first_line[2])
# Read the .hea file
hea_file_path = os.path.join(dataset_dir, hea_filename)
with open(hea_file_path, 'r') as f:
header_data = f.readlines()
# Extract age and sex from the header data
age, sex = None, None
for line in header_data:
if line.startswith('# Age'):
age = int(line.split(': ')[1])
elif line.startswith('# Sex'):
sex = line.split(': ')[1].strip()
sex = 1 if sex == 'Male' else 0 # Convert sex to binary (Male: 1, Female: 0)
elif line.startswith('# Dx'):
disease = int(line.split(': ')[1])
# Additional parsing logic for other features if needed
# Return extracted features
return ptID, num_leads, sample_Fs, age, sex, disease
# We are only using data from lead1
peaks,idx = detect_peaks(data[0],sample_Fs,gain_lead[0])
# mean
mean_RR = np.mean(idx/sample_Fs*1000)
mean_Peaks = np.mean(peaks*gain_lead[0])
# median
median_RR = np.median(idx/sample_Fs*1000)
median_Peaks = np.median(peaks*gain_lead[0])
# standard deviation
std_RR = np.std(idx/sample_Fs*1000)
std_Peaks = np.std(peaks*gain_lead[0])
# variance
var_RR = stats.tvar(idx/sample_Fs*1000)
var_Peaks = stats.tvar(peaks*gain_lead[0])
# Skewness
skew_RR = stats.skew(idx/sample_Fs*1000)
skew_Peaks = stats.skew(peaks*gain_lead[0])
# Kurtosis
kurt_RR = stats.kurtosis(idx/sample_Fs*1000)
kurt_Peaks = stats.kurtosis(peaks*gain_lead[0])
features = np.hstack([age,sex,mean_RR,mean_Peaks,median_RR,median_Peaks,std_RR,std_Peaks,var_RR,var_Peaks,skew_RR,skew_Peaks,kurt_RR,kurt_Peaks])
return features
# Load ECG data from .mat file
data = scipy.io.loadmat('C:\\Users\\Alex\\Downloads\\datasets\\training\\cpsc_2018\\g1\\A0001.mat')
# Extract ECG measurements from the loaded data
ecg_measurements = data['val']
# Flatten the array if needed (assuming you're working with a single channel)
ecg_measurements = ecg_measurements.flatten()
# Define signal frequency and gain
signal_frequency = 500 # Sampling frequency (Hz)
gain = 1000 # Gain (mV)
# Call detect_peaks function to detect R peaks
detected_peaks_values, detected_peaks_indices = detect_peaks(ecg_measurements, signal_frequency, gain)
# Plot the ECG signal with detected R peaks
plt.figure(figsize=(10, 6))
plt.plot(ecg_measurements, label='ECG Signal')
plt.scatter(detected_peaks_indices, detected_peaks_values, color='red', label='R Peaks')
plt.title('ECG Signal with Detected R Peaks')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.legend()
plt.grid(True)
# Stretch the x-axis by setting xlim to cover a smaller range of samples
plt.xlim(0, len(ecg_measurements) // 20) # Adjust the divisor as neede
plt.show()
您的散点图正在绘制
detected_peaks_value
。它已在 detect_peaks
中平方,因此始终为正值,并且不是轨迹的值。看起来它位于时间轴上的正确位置(在幅度最大的峰值旁边,即使它是负数)。
如果你想突出峰值,请尝试改变
plt.scatter(detected_peaks_indices, detected_peaks_values, color='red', label='R Peaks')
到
plt.scatter(detected_peaks_indices, ecg_measurements[detected_peaks_indicies], color='red', label='R Peaks')