-
Notifications
You must be signed in to change notification settings - Fork 30
Adding Completeness Module #141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development
Are you sure you want to change the base?
Changes from 2 commits
315a538
a1f822d
1950322
d4f8390
f73b8fb
2dd5e75
3bee5bd
f562dd1
53132e1
2dace34
6394950
4f6ef86
eaeb6a1
1a399ec
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,3 +8,5 @@ h5py | |
| matplotlib | ||
| packaging | ||
| pyyaml | ||
| plotly | ||
| avro | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| from skdh.completeness.complete import * | ||
johnsam7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| from skdh.completeness.helpers import * | ||
| from skdh.completeness.parse import * | ||
| from skdh.completeness.utils import * | ||
| from skdh.completeness.visualizations import * | ||
|
|
||
| __all__ = ( | ||
LukasAdamowicz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "completeness_pipe", | ||
| "convert_sfreq_to_sampling_interval", | ||
| "from_unix", | ||
| "to_unix", | ||
| "find_nan_regions", | ||
| "vivalink_parse_ecg_data", | ||
| "vivalink_parse_acc_data", | ||
| "empatica_parse_acc_data", | ||
| "compute_summary_metrics", | ||
| "dic_to_str", | ||
| "clean_df", | ||
| "compute_completeness_master", | ||
| "find_charging_periods", | ||
| "find_wear_periods", | ||
| "calculate_completeness_timescale", | ||
| "compute_completeness", | ||
| "truncate_data_dic" | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,127 @@ | ||
| import os | ||
johnsam7 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| import pickle | ||
| import numpy as np | ||
| from skdh.base import BaseProcess | ||
| from skdh.completeness.utils import load_subject_data, compute_completeness_master, compute_summary_metrics, check_hyperparameters | ||
| from skdh.completeness.visualizations import visualize_overview_plot, plot_completeness, plot_data_gaps, plot_timescale_completeness | ||
|
|
||
|
|
||
| class completeness_pipe(BaseProcess): | ||
LukasAdamowicz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| r""" | ||
| Pipeline for assessing signal completeness. | ||
| """ | ||
|
|
||
| def __init__( | ||
LukasAdamowicz marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self, | ||
| subject, | ||
| subject_folder, | ||
| device_name, | ||
| fpath_output, | ||
| measures, | ||
| resample_width_mins=5, | ||
| gap_size_mins=30, | ||
| ranges=None, | ||
| data_gaps=None, | ||
| time_periods=None, | ||
| timescales=None): | ||
|
|
||
| check_hyperparameters(subject, | ||
| subject_folder, | ||
| device_name, | ||
| fpath_output, | ||
| measures, | ||
| resample_width_mins, | ||
| gap_size_mins, | ||
| ranges, | ||
| data_gaps, | ||
| time_periods, | ||
| timescales) | ||
|
|
||
| super().__init__( | ||
| subject=subject, | ||
| subject_folder=subject_folder, | ||
| device_name=device_name, | ||
| fpath_output=fpath_output, | ||
| measures=measures, | ||
| resample_width=resample_width_mins, | ||
| gap_size_mins=gap_size_mins, | ||
| ranges=ranges, | ||
| data_gaps=data_gaps, | ||
| time_periods=time_periods, | ||
| timescales=timescales) | ||
|
|
||
| self.subject = subject | ||
| self.subject_folder = subject_folder | ||
| self.device_name = device_name | ||
| self.fpath_output = fpath_output | ||
| self.measures = measures | ||
| self.resample_width_mins = resample_width_mins | ||
| self.gap_size_mins = gap_size_mins | ||
| self.ranges = ranges | ||
| self.data_gaps = data_gaps | ||
| self.time_periods = time_periods | ||
| self.timescales = timescales | ||
|
|
||
| # @handle_process_returns(results_to_kwargs=True) | ||
johnsam7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| def predict(self, | ||
| generate_figures=True, | ||
| **kwargs): | ||
| """ | ||
| Compute completeness and save results. Create and save figures if generate_figures is True. | ||
| """ | ||
| super().predict( | ||
| expect_days=False, | ||
| expect_wear=False, | ||
| **kwargs, | ||
| ) | ||
| data_dic = load_subject_data(self.subject_folder, self.subject, self.device_name, self.measures, self.ranges) | ||
LukasAdamowicz marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if self.time_periods is None: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this has to be handled here, but we might need to give some thought how time periods would be specified for multiiple different subjects, ie can they only be specified via hours from recording start, etc?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The time periods are specified by the user as times in numpy datetime format, or as 'daily' in which case it's recording time start plus 24 h, 48h, etc. Since the pipeline works on each subject independently, any batch processing of multiple subjects would just be handled by the user looping through all subjects. Not sure if this is consistent with the rest of SKDH processing but I think so? |
||
| self.time_periods = [(np.min([x.index[0] for x in data_dic['Measurement Streams'].values()]), | ||
| np.max([x.index[-1] for x in data_dic['Measurement Streams'].values()]))] | ||
| elif self.time_periods == 'daily': | ||
| t0 = np.min([x.index[0] for x in data_dic['Measurement Streams'].values()]) | ||
| t1 = np.max([x.index[-1] for x in data_dic['Measurement Streams'].values()]) | ||
| no_days = int(np.ceil((t1 - t0) / np.timedelta64(24, 'h'))) | ||
| self.time_periods = [(t0 + k * np.timedelta64(24, 'h'), t0 + (k + 1) * np.timedelta64(24, 'h') if | ||
| t1 > t0 + (k + 1) * np.timedelta64(24, 'h') else t1) for k in range(no_days)] | ||
|
|
||
| # Compute completeness metrics | ||
| completeness_master_dic = compute_completeness_master(data_dic, data_gaps=self.data_gaps, time_periods=self.time_periods, | ||
| timescales=self.timescales) | ||
|
|
||
| # Save raw results and summary metrics | ||
| os.system('mkdir ' + self.fpath_output + '/' + data_dic['Subject ID']) | ||
| os.system('mkdir ' + self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name) | ||
| pickle.dump(completeness_master_dic, | ||
| open(self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name + '/raw_completeness', 'wb')) | ||
| df_summary = compute_summary_metrics(completeness_master_dic, self.time_periods, self.timescales, | ||
| self.measures) # Daily wear time, charging, data gaps, native completeness | ||
| df_summary.to_csv(self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name + '/summary_metrics.csv') | ||
|
|
||
| # Create and save visualizations of results | ||
| figures = {} | ||
| if generate_figures: | ||
| fpath_dir = self.fpath_output + '/' + data_dic['Subject ID'] + '/' + self.device_name + '/' | ||
| overview_fig = visualize_overview_plot(data_dic=data_dic, fpath=fpath_dir + 'overview.html', | ||
| resample_width_mins=self.resample_width_mins, gap_size_mins=self.gap_size_mins, | ||
| time_periods=self.time_periods) | ||
| reason_color_dic = {'normal' : 'blue', 'unknown' : 'magenta', 'charging' : 'orange', 'no_wear' : 'red'} | ||
| figures.update({'overview': overview_fig}) | ||
| completeness_fig = plot_completeness(completeness_master_dic, data_dic, self.time_periods, | ||
| fpath=fpath_dir + 'completeness', reason_color_dic=reason_color_dic) | ||
| figures.update({'Completeness': completeness_fig}) | ||
| if not self.data_gaps is None: | ||
| data_gap_fig = plot_data_gaps(completeness_master_dic, data_dic, self.data_gaps, self.time_periods, | ||
| fpath=fpath_dir + 'data_gaps', reason_color_dic=reason_color_dic) | ||
| figures.update({'data_gaps': data_gap_fig}) | ||
| if not self.timescales is None: | ||
| timescale_compl_fig = plot_timescale_completeness(completeness_master_dic, data_dic, self.time_periods, | ||
| self.timescales, | ||
| fpath=fpath_dir + 'timescale_completeness', | ||
| reason_color_dic=reason_color_dic) | ||
| figures.update({'timescale_completeness': timescale_compl_fig}) | ||
|
|
||
| return {"Completeness": {'data_dic' : data_dic, 'compl_dic' : completeness_master_dic, | ||
| 'df_summary' : df_summary, 'figures' : figures}} | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| import datetime | ||
| import numpy as np | ||
|
|
||
| def convert_sfreq_to_sampling_interval(x): | ||
| """ | ||
| Convert sfreq in Hz (samples/second) to sampling interval (timedelta64[ns]). | ||
| :param x: np.array | float | int, sampling frequency in samples/second (Hz). | ||
| :return: timedelta64[ns], sampling interval as a time delta. | ||
| """ | ||
| return np.timedelta64(1, 'ns') * (1 / x * 10 ** 9) | ||
|
|
||
|
|
||
| def from_unix(ts, time_unit='s', utc_offset=0): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this not be handled by a
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it could be handled by pd.to_timedelta but I'm not sure if there's any benefit of using that over numpy since it returns the same data format
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the benefit would not be requiring a function that has to be maintained by us when one exists
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sry thought you meant the np.timedelta64 function. pd.DataFrame.to_timestamp does not do the same thing as this whole function. |
||
| """ | ||
| Utility function to convert a unix time to timestamp. | ||
| Parameters | ||
| ---------- | ||
| ts : unix time in seconds or series of unix times | ||
| time_unit : 's' ! 'ms' Str, 's' if ts is ~10 ** 9, 'ms' if ts ~10 ** 12 | ||
| utc_offset : utc_offset due time zone in hours | ||
| Return | ||
| ------ | ||
| float | ||
| """ | ||
| time_start = np.datetime64("1970-01-01T00:00:00") | ||
| return ts * np.timedelta64(1, time_unit) + time_start + np.timedelta64(utc_offset * 60 ** 2, "s") | ||
|
|
||
|
|
||
| def to_unix(ts, time_unit='s', utc_offset=0): | ||
| """ | ||
| Utility function to convert a timestamp to unix time. | ||
| Parameters | ||
| ---------- | ||
| ts : timestamp or series of timestamps | ||
| time_unit : 's' ! 'ms' Str, 's' if ts is ~10 ** 9, 'ms' if ts ~10 ** 12 | ||
| Return | ||
| ------ | ||
| float | ||
| """ | ||
| time_start = np.datetime64("1970-01-01T00:00:00") | ||
| return ((ts - time_start) - np.timedelta64(utc_offset, 'h')) / np.timedelta64(1, time_unit) | ||
|
|
||
|
|
||
| def find_nan_regions(arr): | ||
johnsam7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| nan_inds = np.where(np.isnan(arr))[0] | ||
| if len(nan_inds) > 0: | ||
| end_nan_inds = np.append(nan_inds[np.where(~(np.diff(nan_inds) == 1))[0]], nan_inds[-1]) | ||
| start_nan_inds = np.insert(nan_inds[::-1][np.where(~(np.diff(nan_inds[::-1]) == -1))[0]][::-1], 0, nan_inds[0]) | ||
| nan_region_lengths = end_nan_inds - start_nan_inds + 1 | ||
| else: | ||
| start_nan_inds, end_nan_inds, nan_region_lengths = np.array([]), np.array([]), np.array([]) | ||
| return start_nan_inds, end_nan_inds, nan_region_lengths | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| py3.install_sources( | ||
| [ | ||
| '__init__.py', | ||
| 'complete.py', | ||
| 'helpers.py', | ||
| 'parse.py', | ||
| 'utils.py', | ||
| 'visualizations.py', | ||
| ], | ||
| pure: false, | ||
| subdir: 'skdh/completeness', | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| import numpy as np | ||
| import pandas as pd | ||
| import warnings | ||
| import re | ||
| import os | ||
| from skdh.completeness.helpers import from_unix, convert_sfreq_to_sampling_interval | ||
|
|
||
| def vivalink_parse_ecg_data(df, ecg_key, s_freq): | ||
|
||
| if type(s_freq) == int: | ||
| n_samples = np.repeat(s_freq, repeats=len(df)) | ||
| sfreqs = np.array((1 / n_samples * 10 ** 9).astype(int), dtype='<m8[ns]') | ||
| elif type(s_freq) == str: | ||
| n_samples = df[s_freq] | ||
| sfreqs = np.array([], dtype='<m8[ns]') | ||
| for c in range(len(df)): | ||
| sfreqs = np.append(sfreqs, np.repeat(np.timedelta64(int(1 / df.iloc[c, df.columns.get_loc(s_freq)] * 10 ** 9), 'ns'), repeats=df[s_freq].iloc[c])) | ||
| else: | ||
| raise TypeError('n_samples has to be a string if a key in df, or an int') | ||
| ecg_signal = [] | ||
| device = np.array([]) | ||
| df_ecg = pd.DataFrame() | ||
| times = np.array([]) | ||
| for k in range(len(df)): | ||
| ecg_records = df[ecg_key].iloc[k].replace('[', '').replace(']', '').split(',') | ||
| ecg_segment = np.array([np.nan] * n_samples[k]) | ||
| if not len(ecg_records) == n_samples[k]: | ||
| warnings.warn('Not correct number of samples in df entry. Presuming they are in the beginning of the' | ||
| ' measurement period.') | ||
| for c, ele in enumerate(df[ecg_key].iloc[k].replace('[', '').replace(']', '').split(',')): | ||
| if 'null' not in ele: | ||
| ecg_segment[c] = float(ele) | ||
| else: | ||
| ecg_segment[c] = np.nan | ||
| ecg_signal.append(ecg_segment) | ||
| times = np.append(times, np.array(df['Record Time'][k]).repeat(n_samples[k]) + (np.arange(n_samples[k]) * 1 / n_samples[k] * 1000)) | ||
| device = np.append(device, np.array(df['Device ID'][k]).repeat(n_samples[k])) | ||
| ecg_signal = np.array(ecg_signal).ravel() | ||
| df_ecg['Time Unix (ms)'] = times | ||
| df_ecg['ecg_raw'] = ecg_signal | ||
| df_ecg['Sampling_Freq'] = sfreqs | ||
| df_ecg['Device ID'] = device | ||
|
|
||
| return df_ecg | ||
|
|
||
|
|
||
| def vivalink_parse_acc_data(df, acc_key): | ||
|
||
|
|
||
| acc_samp = [] | ||
| for k in range(len(df)): | ||
| sensor_info = df['Sensor Info'].iloc[k].split(';') | ||
| acc_samp.append(int( | ||
| re.sub("[^0-9]", "", sensor_info[np.where(['accSamplingFrequency' in x for x in sensor_info])[0][0]]))) | ||
|
|
||
| acc_signal = np.array([]).reshape((0, 3)) | ||
| device = np.array([]) | ||
| times = np.array([]) | ||
| sfreq = np.array([]) | ||
| for k in range(len(df)): | ||
| acc_records = np.array(df[acc_key].iloc[k].split(',')) | ||
| x_inds = np.array(['x' in x for x in acc_records]) | ||
| y_inds = np.array(['y' in x for x in acc_records]) | ||
| z_inds = np.array(['z' in x for x in acc_records]) | ||
| assert np.sum(x_inds) == np.sum(y_inds),\ | ||
johnsam7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| 'Number of x and y acceleration data points not the same!' | ||
| assert np.sum(y_inds) == np.sum(z_inds),\ | ||
johnsam7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| 'Number of y and z acceleration data points not the same!' | ||
| acc_data = np.array([re.sub("[^0-9]", "", x) for x in acc_records], dtype=float) | ||
| x_acc = acc_data[x_inds] | ||
| y_acc = acc_data[y_inds] | ||
| z_acc = acc_data[z_inds] | ||
| acc_mat = np.array([x_acc, y_acc, z_acc]).T | ||
| if not acc_mat.shape[0] == acc_samp[k]: | ||
| warnings.warn('Not correct number of samples in df entry. Presuming they are in the beginning of the' | ||
| ' measurement period.') | ||
| acc_segment = np.array([[[np.nan] * 3] * acc_samp[k]]).squeeze() | ||
| acc_segment[:len(acc_mat), :] = acc_mat | ||
| acc_signal = np.vstack((acc_signal, acc_segment)) | ||
| device = np.append(device, np.array(df['Device ID'][k]).repeat(acc_samp[k])) | ||
| times = np.append(times, np.array(df['Record Time'][k]).repeat(acc_samp[k]) + (np.arange(acc_samp[k]) * 1/acc_samp[k] * 1000)) | ||
| sfreq = np.append(sfreq, np.ones(acc_samp[k]) * acc_samp[k]) | ||
| triaxial_acc = [] | ||
| for c, axis in enumerate(['x', 'y', 'z']): | ||
| df_acc = pd.DataFrame() | ||
| df_acc['Time Unix (ms)'] = times | ||
| df_acc['acc_raw_' + axis] = acc_signal[:, c] | ||
| df_acc['Sampling Frequency (Hz)'] = sfreq | ||
| df_acc['Device ID'] = device | ||
| triaxial_acc.append(df_acc) | ||
| return triaxial_acc | ||
|
|
||
|
|
||
| def empatica_parse_acc_data(fdir_raw): | ||
johnsam7 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| from avro.datafile import DataFileReader | ||
| from avro.io import DatumReader | ||
|
|
||
| fnames = os.listdir(fdir_raw) | ||
| times_all = np.array([]) | ||
| acc_raw_all = np.array([]).reshape((0, 3)) | ||
| devices_all = [] | ||
| sfreq_all = [] | ||
| for fname in fnames: | ||
| if fname[-5:] == '.avro': | ||
| reader = DataFileReader(open(fdir_raw + fname, "rb"), DatumReader()) | ||
| data_list = [] | ||
| for data_frac in reader: | ||
| data_list.append(data_frac) | ||
| reader.close() | ||
| if not data_list[0]['rawData']['accelerometer']['timestampStart'] == 0: # Avoid empty files | ||
| times = data_list[0]['rawData']['accelerometer']['timestampStart'] / 10 ** 6 + data_list[0]['rawData'][ | ||
| 'accelerometer']['samplingFrequency'] ** -1 * np.arange(len(data_list[0]['rawData']['accelerometer']['x'])) | ||
| acc_raw = np.array([data_list[0]['rawData']['accelerometer']['x'], | ||
| data_list[0]['rawData']['accelerometer']['y'], | ||
| data_list[0]['rawData']['accelerometer']['z']]).T | ||
| acc_raw = acc_raw * data_list[0]['rawData']['accelerometer']['imuParams']['physicalMax'] / \ | ||
| data_list[0]['rawData']['accelerometer']['imuParams']['digitalMax'] | ||
| devices_all = devices_all + [data_list[0]['deviceSn']] * len(times) | ||
| sfreq_all = sfreq_all + [data_list[0]['rawData']['accelerometer']['samplingFrequency']] * len(times) | ||
| times_all = np.concatenate((times_all, times)) | ||
| acc_raw_all = np.concatenate((acc_raw_all, acc_raw), axis=0) | ||
| sort_inds = np.argsort(times_all) | ||
| times_all = times_all[sort_inds] | ||
| acc_raw_all = acc_raw_all[sort_inds] | ||
| devices_all = np.array(devices_all)[sort_inds] | ||
| sfreq_all = np.array(sfreq_all)[sort_inds] | ||
|
|
||
| dfs = [] | ||
| for c, axis in enumerate(['x', 'y', 'z']): | ||
| df_acc_raw = pd.DataFrame(data={'acc_raw_' + axis : acc_raw_all[:, c]}, index=times_all) | ||
| df_acc_raw['Device ID'] = devices_all | ||
| df_acc_raw['Sampling_Freq'] = convert_sfreq_to_sampling_interval(sfreq_all) | ||
| df_acc_raw.index = from_unix(df_acc_raw.index, time_unit='s') | ||
| dfs.append(df_acc_raw) | ||
|
|
||
| return dfs | ||
Uh oh!
There was an error while loading. Please reload this page.