"""This file contains the ecg processing pipelines.""" import pandas as pd import neurokit2 as nk from .ecg_feature_extraction import get_hrv_features, normalize_features from .utils import cut_out_window, create_windows pd.set_option('display.float_format', '{:.6f}'.format) from .logger import setup_logger logger = setup_logger(__name__) def process_window(window, window_id, frequency): features = get_hrv_features(window['ecg'].values, frequency) tmp = pd.DataFrame(features, index=[0]) tmp['subject_id'] = window['subject_id'].unique()[0] tmp['sample_id'] = str(window['sample_id'].unique()[0]) tmp['window_id'] = window_id tmp['w_start_time'] = window['timestamp_idx'].min().strftime('%Y-%m-%d %H:%M:%S') tmp['w_end_time'] = window['timestamp_idx'].max().strftime('%Y-%m-%d %H:%M:%S') tmp['baseline'] = window_id == 0 tmp['frequency'] = frequency return tmp def process_batch(samples, configs): features_list = [] for i, sample in enumerate(samples): logger.info(f"Processing sample ({i + 1}/{len(samples)})...") sample_df = pd.DataFrame.from_dict(sample.dict()) # Preprocess the ecg signal logger.info("Preprocess ECG signals...") sample_df['ecg'] = nk.ecg_clean(sample_df['ecg'], sampling_rate=sample.frequency, method="pantompkins1985") # Cut out the windows and process them if configs.baseline_start: logger.info("Cut out baseline window...") baseline_window = cut_out_window(sample_df, 'timestamp_idx', start=configs.baseline_start, end=configs.baseline_end) sample_df = sample_df[sample_df['timestamp_idx'] > baseline_window['timestamp_idx'].max()] logger.info("Processing baseline window...") features_list.append(process_window(baseline_window, 0, sample.frequency)) logger.info("Cut out windows...") windows = create_windows(df=sample_df, time_column='timestamp_idx', window_size=configs.window_size, window_slicing_method=configs.window_slicing_method) logger.info(f"Processing windows (Total: {len(windows)})...") features_list.extend(process_window(window, i, sample.frequency) for i, window in enumerate(windows, start=1)) features_df = pd.concat(features_list, ignore_index=True) # Normalize the features via baseline subtraction if configs.baseline_start: features_df = normalize_features(features_df, configs.normalization_method) return features_df