ML |心电图信号预处理、峰值检测和绘图

问题描述 投票:0回答:1

我正在开展一个项目,开发一种可以检测心电图异常的算法。这是我的大学项目。我有一个很大的数据集,但现在我只处理一个文件,以便练习和理解流程。我正在尝试检测 R 峰值,但是当我绘制图像时,红点位于我的信号上方,因此这意味着它没有正确检测到它们。这对我来说是一条死胡同。我不知道如何解决它。这只是学士学位,所以我对此主题没有太多了解,但我正在尽力而为。

Peak detection plot

#Import the libraries
import random
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import scipy.io
import os
from concurrent.futures import ThreadPoolExecutor
from scipy.signal import butter, lfilter
# Main directory containing subdirectories with .mat and .hea files
main_directory = r'C:\Users\Alex\Downloads\datasets\training\cpsc_2018\g1'

# Initialize an empty dictionary to store loaded data
loaded_data = {}

# Walk through all directories and subdirectories
for root, dirs, files in os.walk(main_directory):
    for filename in files:
        if filename.endswith('.mat') or filename.endswith('.hea'):  # Check for both .mat and .hea files
            try:
                # Load .mat file
                if filename.endswith('.mat'):
                    data = scipy.io.loadmat(os.path.join(root, filename))
                # Load .hea file
                elif filename.endswith('.hea'):
                    with open(os.path.join(root, filename), 'r') as f:
                        data = f.read()  # Read the content of .hea file
                # Store loaded data in dictionary
                loaded_data[os.path.join(root, filename)] = data
            except Exception as e:
                print(f"Error loading file {filename}: {e}")
def baseline_correction(signal):
    # Fit a polynomial to the signal and subtract it
    poly_degree = 3  # Choose the degree of the polynomial
    poly_coeffs = np.polyfit(np.arange(len(signal)), signal, poly_degree)
    baseline = np.polyval(poly_coeffs, np.arange(len(signal)))
    corrected_signal = signal - baseline
    return corrected_signal
def artifact_removal(signal, threshold=100):
    # Remove segments where signal exceeds the threshold
    clean_signal = signal.copy()
    clean_signal[np.abs(signal) > threshold] = 0
    return clean_signal
def normalize_signal(signal):
    min_val = np.min(signal)
    max_val = np.max(signal)
    normalized_signal = (signal - min_val) / (max_val - min_val)
    return normalized_signal
def assess_signal_quality(signal, noise_level=10):
    # Calculate SNR as the ratio of signal amplitude to noise level
    signal_amplitude = np.max(signal) - np.min(signal)
    snr = signal_amplitude / noise_level
    # Return True if SNR is above a threshold, else False
    return snr > 5  # Adjust threshold as needed
def detect_peaks(ecg_measurements,signal_frequency,gain):

        filter_lowcut = 0.001
        filter_highcut = 15.0
        filter_order = 1
        integration_window = 30  # Change proportionally when adjusting frequency (in samples).
        findpeaks_limit = 0.35
        findpeaks_spacing = 100  # Change proportionally when adjusting frequency (in samples).
        refractory_period = 240  # Change proportionally when adjusting frequency (in samples).
        qrs_peak_filtering_factor = 0.125
        noise_peak_filtering_factor = 0.125
        qrs_noise_diff_weight = 0.25


        # Detection results.
        qrs_peaks_indices = np.array([], dtype=int)
        noise_peaks_indices = np.array([], dtype=int)


        # Measurements filtering - 0-15 Hz band pass filter.
        filtered_ecg_measurements = bandpass_filter(ecg_measurements, lowcut=filter_lowcut, highcut=filter_highcut, signal_freq=signal_frequency, filter_order=filter_order)

        filtered_ecg_measurements[:5] = filtered_ecg_measurements[5]

        # Derivative - provides QRS slope information.
        differentiated_ecg_measurements = np.ediff1d(filtered_ecg_measurements)

        # Squaring - intensifies values received in derivative.
        squared_ecg_measurements = differentiated_ecg_measurements ** 2

        # Moving-window integration.
        integrated_ecg_measurements = np.convolve(squared_ecg_measurements, np.ones(integration_window)/integration_window)

        # Fiducial mark - peak detection on integrated measurements.
        detected_peaks_indices = findpeaks(data=integrated_ecg_measurements,
                                                     limit=findpeaks_limit,
                                                     spacing=findpeaks_spacing)

        detected_peaks_values = integrated_ecg_measurements[detected_peaks_indices]

        return detected_peaks_values,detected_peaks_indices
def findpeaks(data, spacing=1, limit=None):
        """
        Janko Slavic peak detection algorithm and implementation.
        https://github.com/jankoslavic/py-tools/tree/master/findpeaks
        Finds peaks in `data` which are of `spacing` width and >=`limit`.
        :param ndarray data: data
        :param float spacing: minimum spacing to the next peak (should be 1 or more)
        :param float limit: peaks should have value greater or equal
        :return array: detected peaks indexes array
        """
        len = data.size
        x = np.zeros(len + 2 * spacing)
        x[:spacing] = data[0] - 1.e-6
        x[-spacing:] = data[-1] - 1.e-6
        x[spacing:spacing + len] = data
        peak_candidate = np.zeros(len)
        peak_candidate[:] = True
        for s in range(spacing):
            start = spacing - s - 1
            h_b = x[start: start + len]  # before
            start = spacing
            h_c = x[start: start + len]  # central
            start = spacing + s + 1
            h_a = x[start: start + len]  # after
            peak_candidate = np.logical_and(peak_candidate, np.logical_and(h_c > h_b, h_c > h_a))

        ind = np.argwhere(peak_candidate)
        ind = ind.reshape(ind.size)
        if limit is not None:
            ind = ind[data[ind] > limit]
        return ind
def get_12ECG_features(data, header_data):

    # Split the first line to get essential recording information
    first_line = header_data[0].split()
    ptID = first_line[0]
    num_leads = int(first_line[1])
    sample_Fs = int(first_line[2])
    
    # Read the .hea file
    hea_file_path = os.path.join(dataset_dir, hea_filename)
    with open(hea_file_path, 'r') as f:
        header_data = f.readlines()
    
    # Extract age and sex from the header data
    age, sex = None, None
    for line in header_data:
        if line.startswith('# Age'):
            age = int(line.split(': ')[1])
        elif line.startswith('# Sex'):
            sex = line.split(': ')[1].strip()
            sex = 1 if sex == 'Male' else 0  # Convert sex to binary (Male: 1, Female: 0)
        elif line.startswith('# Dx'):
            disease = int(line.split(': ')[1])
    
    # Additional parsing logic for other features if needed

    # Return extracted features
    return ptID, num_leads, sample_Fs, age, sex, disease


    
#   We are only using data from lead1
    peaks,idx = detect_peaks(data[0],sample_Fs,gain_lead[0])
   
#   mean
    mean_RR = np.mean(idx/sample_Fs*1000)
    mean_Peaks = np.mean(peaks*gain_lead[0])

#   median
    median_RR = np.median(idx/sample_Fs*1000)
    median_Peaks = np.median(peaks*gain_lead[0])

#   standard deviation
    std_RR = np.std(idx/sample_Fs*1000) 
    std_Peaks = np.std(peaks*gain_lead[0])

#   variance
    var_RR = stats.tvar(idx/sample_Fs*1000)
    var_Peaks = stats.tvar(peaks*gain_lead[0])

#   Skewness
    skew_RR = stats.skew(idx/sample_Fs*1000)
    skew_Peaks = stats.skew(peaks*gain_lead[0])

#   Kurtosis
    kurt_RR = stats.kurtosis(idx/sample_Fs*1000)
    kurt_Peaks = stats.kurtosis(peaks*gain_lead[0])

    features = np.hstack([age,sex,mean_RR,mean_Peaks,median_RR,median_Peaks,std_RR,std_Peaks,var_RR,var_Peaks,skew_RR,skew_Peaks,kurt_RR,kurt_Peaks])

  
    return features
# Load ECG data from .mat file
data = scipy.io.loadmat('C:\\Users\\Alex\\Downloads\\datasets\\training\\cpsc_2018\\g1\\A0001.mat')

# Extract ECG measurements from the loaded data
ecg_measurements = data['val']

# Flatten the array if needed (assuming you're working with a single channel)
ecg_measurements = ecg_measurements.flatten()

# Define signal frequency and gain
signal_frequency = 500  # Sampling frequency (Hz)
gain = 1000  # Gain (mV)

# Call detect_peaks function to detect R peaks
detected_peaks_values, detected_peaks_indices = detect_peaks(ecg_measurements, signal_frequency, gain)

# Plot the ECG signal with detected R peaks
plt.figure(figsize=(10, 6))
plt.plot(ecg_measurements, label='ECG Signal')
plt.scatter(detected_peaks_indices, detected_peaks_values, color='red', label='R Peaks')
plt.title('ECG Signal with Detected R Peaks')
plt.xlabel('Sample')
plt.ylabel('Amplitude')
plt.legend()
plt.grid(True)
# Stretch the x-axis by setting xlim to cover a smaller range of samples
plt.xlim(0, len(ecg_measurements) // 20)  # Adjust the divisor as neede
plt.show()
python machine-learning signal-processing
1个回答
0
投票

您的散点图正在绘制

detected_peaks_value
。它已在
detect_peaks
中平方,因此始终为正值,并且不是轨迹的值。看起来它位于时间轴上的正确位置(在幅度最大的峰值旁边,即使它是负数)。

如果你想突出峰值,请尝试改变

plt.scatter(detected_peaks_indices, detected_peaks_values, color='red', label='R Peaks')

plt.scatter(detected_peaks_indices, ecg_measurements[detected_peaks_indicies], color='red', label='R Peaks')
© www.soinside.com 2019 - 2024. All rights reserved.