190 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			190 lines
		
	
	
		
			9.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
 | ||
| import neurokit2 as nk
 | ||
| import numpy as np
 | ||
| import pandas as pd
 | ||
| 
 | ||
| from global_var import global_var_init
 | ||
| cycle, fs, record_name, data_path = global_var_init()
 | ||
| 
 | ||
| from pyPPG.datahandling import load_data
 | ||
| from pyPPG import PPG
 | ||
| import pyPPG.fiducials as FP
 | ||
| import pyPPG.preproc as PP
 | ||
| 
 | ||
| # 归一化函数
 | ||
| def normalize(data):
 | ||
|     min_val = np.min(data)
 | ||
|     max_val = np.max(data)
 | ||
|     return (data - min_val) / (max_val - min_val)
 | ||
| 
 | ||
| 
 | ||
| def processing():
 | ||
| 
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #----------------------------                     ECG                    --------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
|     
 | ||
|     # 2. 读 CSV
 | ||
|     df = pd.read_csv(data_path + '//' + record_name)
 | ||
| 
 | ||
|     # 滤波(注意:我对nk软件包中method="neurokit"的源代码进行了部分修改!)
 | ||
|     ecg_signal = nk.ecg_clean(df['ECG_I'], sampling_rate = fs, method="neurokit")
 | ||
|    
 | ||
|     # 创建时间向量
 | ||
|     t_ecg = np.arange(0, len(ecg_signal))/fs
 | ||
| 
 | ||
|     # 归一化
 | ||
|     ecg_signal = normalize(ecg_signal)
 | ||
| 
 | ||
|     _, rpeaks = nk.ecg_peaks(ecg_signal, sampling_rate = fs)
 | ||
| 
 | ||
|     signal_dwt, waves_dwt = nk.ecg_delineate(ecg_signal, 
 | ||
|                                             rpeaks, 
 | ||
|                                             sampling_rate = fs, 
 | ||
|                                             method="dwt", #此处方法有cwt和dwt可选
 | ||
|                                             show_type='all',
 | ||
|                                             )
 | ||
|     
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #----------------------------                     PPG                    --------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| 
 | ||
| 
 | ||
| 
 | ||
|     # 加载原始PPG信号
 | ||
|     record_path = data_path + '//' + record_name
 | ||
|     signal = load_data(data_path = record_path, fs=fs, use_tk=False)
 | ||
|     signal.v = -signal.v[:, 3]  # 可用-1将信号取反便于查看,后面是列的索引位置(信道)
 | ||
|     
 | ||
| 
 | ||
|     # 创建时间向量
 | ||
|     t_ppg = np.arange(0, len(signal.v))/fs
 | ||
| 
 | ||
|     # 滤波
 | ||
|     signal.filtering = True # whether or not to filter the PPG signal
 | ||
|     signal.fL = 0.5000001 # Lower cutoff frequency (Hz)
 | ||
|     signal.fH = 10 # Upper cutoff frequency (Hz)
 | ||
|     signal.order = 2 # Filter order
 | ||
|     prep = PP.Preprocess(fL=signal.fL, fH=signal.fH, order=signal.order)
 | ||
|     signal.ppg, signal.vpg, signal.apg, signal.jpg = prep.get_signals(s=signal)
 | ||
| 
 | ||
|     # Create a PPG class
 | ||
|     s = PPG(signal)
 | ||
|     fpex = FP.FpCollection(s=s)
 | ||
|     fiducials = fpex.get_fiducials(s=s)
 | ||
|     fiducials_df = pd.DataFrame(fiducials)
 | ||
|     # 提取行的数据(注意:Pandas的索引是从0开始的)
 | ||
|     rows_to_extract = fiducials_df.iloc[1:-1]  # 提取第1到最后1行
 | ||
| 
 | ||
|     # 归一化
 | ||
|     signal.ppg = normalize(signal.ppg)
 | ||
|     # signal.ppg = np.array(signal.ppg, dtype=np.float64)  # 强制转换
 | ||
|     signal.vpg = normalize(signal.vpg)
 | ||
|     signal.apg = normalize(signal.apg)
 | ||
| 
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #----------------------------      获取每一列的数据及对应的纵坐标值(PPG)    --------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| #--------------------------------------------------------------------------------------------------------
 | ||
| 
 | ||
|     # 获取'on'列的数据
 | ||
|     on_column_data = rows_to_extract['on']
 | ||
|     on_column_data_toArea = rows_to_extract['on']
 | ||
|     on_column_list = [idx for idx in on_column_data.tolist() if not pd.isna(idx)]
 | ||
|     on_column_list_toArea = [idx for idx in on_column_data_toArea.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'on'列对应的纵坐标值
 | ||
|     on_values = [(signal.ppg[idx]) for idx in on_column_list]
 | ||
|     on_values_toArea = [(signal.ppg[idx]) for idx in on_column_list_toArea]
 | ||
|     # 获取'sp'列的数据
 | ||
|     sp_column_data = rows_to_extract['sp']
 | ||
|     sp_column_list = [idx for idx in sp_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'sp'列对应的纵坐标值
 | ||
|     sp_values = [(signal.ppg[idx]) for idx in sp_column_list]
 | ||
|     # 获取'dn'列的数据
 | ||
|     dn_column_data = rows_to_extract['dn']
 | ||
|     dn_column_list = [idx for idx in dn_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'dn'列对应的纵坐标值
 | ||
|     dn_values = [(signal.ppg[idx]) for idx in dn_column_list]
 | ||
|     # 获取'dp'列的数据
 | ||
|     dp_column_data = rows_to_extract['dp']
 | ||
|     dp_column_list = [idx for idx in dp_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'dp'列对应的纵坐标值
 | ||
|     dp_values = [(signal.ppg[idx]) for idx in dp_column_list]
 | ||
|     # 提取'u'列的数据
 | ||
|     u_column_data = rows_to_extract['u']
 | ||
|     u_column_data_toArea = rows_to_extract['u']
 | ||
|     u_column_list = [idx for idx in u_column_data.tolist() if not pd.isna(idx)]
 | ||
|     u_column_list_toArea = [idx for idx in u_column_data_toArea.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'u'列对应的纵坐标值
 | ||
|     u_values = [(signal.vpg[idx]) for idx in u_column_list]
 | ||
|     u_values_toArea = [(signal.vpg[idx]) for idx in u_column_list_toArea]
 | ||
|     # 提取'v'列的数据
 | ||
|     v_column_data = rows_to_extract['v']
 | ||
|     v_column_list = [idx for idx in v_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'v'列对应的纵坐标值
 | ||
|     v_values = [(signal.vpg[idx]) for idx in v_column_list]
 | ||
|     # 提取'w'列的数据
 | ||
|     w_column_data = rows_to_extract['w']
 | ||
|     w_column_list = [idx for idx in w_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'w'列对应的纵坐标值
 | ||
|     w_values = [(signal.vpg[idx]) for idx in w_column_list]
 | ||
|     # 提取'a'列的数据
 | ||
|     a_column_data = rows_to_extract['a']
 | ||
|     a_column_data_toArea = rows_to_extract['a']
 | ||
|     a_column_list = [idx for idx in a_column_data.tolist() if not pd.isna(idx)]
 | ||
|     a_column_list_toArea = [idx for idx in a_column_data_toArea.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'a'列对应的纵坐标值
 | ||
|     a_values = [(signal.apg[idx]) for idx in a_column_list]
 | ||
|     a_values_toArea = [(signal.apg[idx]) for idx in a_column_list_toArea]
 | ||
|     # 提取'b'列的数据
 | ||
|     b_column_data = rows_to_extract['b']
 | ||
|     b_column_list = [idx for idx in b_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'b'列对应的纵坐标值
 | ||
|     b_values = [(signal.apg[idx]) for idx in b_column_list]
 | ||
|     # 提取'c'列的数据
 | ||
|     c_column_data = rows_to_extract['c']
 | ||
|     c_column_list = [idx for idx in c_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'c'列对应的纵坐标值
 | ||
|     c_values = [(signal.apg[idx]) for idx in c_column_list]
 | ||
|     # 提取'e'列的数据
 | ||
|     e_column_data = rows_to_extract['e']
 | ||
|     e_column_list = [idx for idx in e_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'e'列对应的纵坐标值
 | ||
|     e_values = [(signal.apg[idx]) for idx in e_column_list]
 | ||
|     # 提取'f'列的数据
 | ||
|     f_column_data = rows_to_extract['f']
 | ||
|     f_column_list = [idx for idx in f_column_data.tolist() if not pd.isna(idx)]
 | ||
|     # 获取'f'列对应的纵坐标值
 | ||
|     f_values = [(signal.apg[idx]) for idx in f_column_list]
 | ||
| 
 | ||
|     return  (signal_dwt , waves_dwt , rpeaks , ecg_signal,
 | ||
|              signal, signal.v, signal.fs, rows_to_extract, t_ecg, t_ppg,
 | ||
|              on_column_list, on_column_list_toArea, on_values, on_values_toArea,
 | ||
|              sp_column_list, sp_values, dn_column_list, dn_values, dp_column_list,
 | ||
|              dp_values, u_column_list, u_column_list_toArea, u_values, u_values_toArea,
 | ||
|              v_column_list, v_values, w_column_list, w_values, a_column_list, a_column_list_toArea,
 | ||
|              a_values, a_values_toArea, b_column_list, b_values, c_column_list, c_values,
 | ||
|              e_column_list, e_values, f_column_list, f_values,
 | ||
|              signal.ppg, signal.vpg, signal.apg, signal.jpg)
 | ||
| 
 | ||
| 
 | ||
| if __name__ == "__main__":
 | ||
| 
 | ||
|     processing() |