Files
Feature-Extraction/processing.py
2025-10-20 22:01:18 +08:00

190 lines
9.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import neurokit2 as nk
import numpy as np
import pandas as pd
from global_var import global_var_init
cycle, fs, record_name, data_path = global_var_init()
from pyPPG.datahandling import load_data
from pyPPG import PPG
import pyPPG.fiducials as FP
import pyPPG.preproc as PP
# 归一化函数
def normalize(data):
min_val = np.min(data)
max_val = np.max(data)
return (data - min_val) / (max_val - min_val)
def processing():
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#---------------------------- ECG --------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
# 2. 读 CSV
df = pd.read_csv(data_path + '//' + record_name)
# 滤波注意我对nk软件包中method="neurokit"的源代码进行了部分修改!)
ecg_signal = nk.ecg_clean(df['ECG_I'], sampling_rate = fs, method="neurokit")
# 创建时间向量
t_ecg = np.arange(0, len(ecg_signal))/fs
# 归一化
ecg_signal = normalize(ecg_signal)
_, rpeaks = nk.ecg_peaks(ecg_signal, sampling_rate = fs)
signal_dwt, waves_dwt = nk.ecg_delineate(ecg_signal,
rpeaks,
sampling_rate = fs,
method="dwt", #此处方法有cwt和dwt可选
show_type='all',
)
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#---------------------------- PPG --------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
# 加载原始PPG信号
record_path = data_path + '//' + record_name
signal = load_data(data_path = record_path, fs=fs, use_tk=False)
signal.v = -signal.v[:, 3] # 可用-1将信号取反便于查看,后面是列的索引位置(信道)
# 创建时间向量
t_ppg = np.arange(0, len(signal.v))/fs
# 滤波
signal.filtering = True # whether or not to filter the PPG signal
signal.fL = 0.5000001 # Lower cutoff frequency (Hz)
signal.fH = 10 # Upper cutoff frequency (Hz)
signal.order = 2 # Filter order
prep = PP.Preprocess(fL=signal.fL, fH=signal.fH, order=signal.order)
signal.ppg, signal.vpg, signal.apg, signal.jpg = prep.get_signals(s=signal)
# Create a PPG class
s = PPG(signal)
fpex = FP.FpCollection(s=s)
fiducials = fpex.get_fiducials(s=s)
fiducials_df = pd.DataFrame(fiducials)
# 提取行的数据注意Pandas的索引是从0开始的
rows_to_extract = fiducials_df.iloc[1:-1] # 提取第1到最后1行
# 归一化
signal.ppg = normalize(signal.ppg)
# signal.ppg = np.array(signal.ppg, dtype=np.float64) # 强制转换
signal.vpg = normalize(signal.vpg)
signal.apg = normalize(signal.apg)
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#---------------------------- 获取每一列的数据及对应的纵坐标值(PPG) --------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
#--------------------------------------------------------------------------------------------------------
# 获取'on'列的数据
on_column_data = rows_to_extract['on']
on_column_data_toArea = rows_to_extract['on']
on_column_list = [idx for idx in on_column_data.tolist() if not pd.isna(idx)]
on_column_list_toArea = [idx for idx in on_column_data_toArea.tolist() if not pd.isna(idx)]
# 获取'on'列对应的纵坐标值
on_values = [(signal.ppg[idx]) for idx in on_column_list]
on_values_toArea = [(signal.ppg[idx]) for idx in on_column_list_toArea]
# 获取'sp'列的数据
sp_column_data = rows_to_extract['sp']
sp_column_list = [idx for idx in sp_column_data.tolist() if not pd.isna(idx)]
# 获取'sp'列对应的纵坐标值
sp_values = [(signal.ppg[idx]) for idx in sp_column_list]
# 获取'dn'列的数据
dn_column_data = rows_to_extract['dn']
dn_column_list = [idx for idx in dn_column_data.tolist() if not pd.isna(idx)]
# 获取'dn'列对应的纵坐标值
dn_values = [(signal.ppg[idx]) for idx in dn_column_list]
# 获取'dp'列的数据
dp_column_data = rows_to_extract['dp']
dp_column_list = [idx for idx in dp_column_data.tolist() if not pd.isna(idx)]
# 获取'dp'列对应的纵坐标值
dp_values = [(signal.ppg[idx]) for idx in dp_column_list]
# 提取'u'列的数据
u_column_data = rows_to_extract['u']
u_column_data_toArea = rows_to_extract['u']
u_column_list = [idx for idx in u_column_data.tolist() if not pd.isna(idx)]
u_column_list_toArea = [idx for idx in u_column_data_toArea.tolist() if not pd.isna(idx)]
# 获取'u'列对应的纵坐标值
u_values = [(signal.vpg[idx]) for idx in u_column_list]
u_values_toArea = [(signal.vpg[idx]) for idx in u_column_list_toArea]
# 提取'v'列的数据
v_column_data = rows_to_extract['v']
v_column_list = [idx for idx in v_column_data.tolist() if not pd.isna(idx)]
# 获取'v'列对应的纵坐标值
v_values = [(signal.vpg[idx]) for idx in v_column_list]
# 提取'w'列的数据
w_column_data = rows_to_extract['w']
w_column_list = [idx for idx in w_column_data.tolist() if not pd.isna(idx)]
# 获取'w'列对应的纵坐标值
w_values = [(signal.vpg[idx]) for idx in w_column_list]
# 提取'a'列的数据
a_column_data = rows_to_extract['a']
a_column_data_toArea = rows_to_extract['a']
a_column_list = [idx for idx in a_column_data.tolist() if not pd.isna(idx)]
a_column_list_toArea = [idx for idx in a_column_data_toArea.tolist() if not pd.isna(idx)]
# 获取'a'列对应的纵坐标值
a_values = [(signal.apg[idx]) for idx in a_column_list]
a_values_toArea = [(signal.apg[idx]) for idx in a_column_list_toArea]
# 提取'b'列的数据
b_column_data = rows_to_extract['b']
b_column_list = [idx for idx in b_column_data.tolist() if not pd.isna(idx)]
# 获取'b'列对应的纵坐标值
b_values = [(signal.apg[idx]) for idx in b_column_list]
# 提取'c'列的数据
c_column_data = rows_to_extract['c']
c_column_list = [idx for idx in c_column_data.tolist() if not pd.isna(idx)]
# 获取'c'列对应的纵坐标值
c_values = [(signal.apg[idx]) for idx in c_column_list]
# 提取'e'列的数据
e_column_data = rows_to_extract['e']
e_column_list = [idx for idx in e_column_data.tolist() if not pd.isna(idx)]
# 获取'e'列对应的纵坐标值
e_values = [(signal.apg[idx]) for idx in e_column_list]
# 提取'f'列的数据
f_column_data = rows_to_extract['f']
f_column_list = [idx for idx in f_column_data.tolist() if not pd.isna(idx)]
# 获取'f'列对应的纵坐标值
f_values = [(signal.apg[idx]) for idx in f_column_list]
return (signal_dwt , waves_dwt , rpeaks , ecg_signal,
signal, signal.v, signal.fs, rows_to_extract, t_ecg, t_ppg,
on_column_list, on_column_list_toArea, on_values, on_values_toArea,
sp_column_list, sp_values, dn_column_list, dn_values, dp_column_list,
dp_values, u_column_list, u_column_list_toArea, u_values, u_values_toArea,
v_column_list, v_values, w_column_list, w_values, a_column_list, a_column_list_toArea,
a_values, a_values_toArea, b_column_list, b_values, c_column_list, c_values,
e_column_list, e_values, f_column_list, f_values,
signal.ppg, signal.vpg, signal.apg, signal.jpg)
if __name__ == "__main__":
processing()