Source code for nrv.utils._stimulus

"""
NRV-:class:`.stimulus` handling.
"""

import faulthandler

import numpy as np

from ..backend._log_interface import pass_info, rise_warning, rise_error
from ..backend._NRV_Class import NRV_class
import matplotlib.pyplot as plt

# enable faulthandler to ease "segmentation faults" debug
faulthandler.enable()


###############
## Functions ##
###############
[docs] def is_stim(stim): """ check if an object is a stimulus, return True if yes, else False Parameters ---------- stim : object object to test Returns ------- bool True it the type is a stimulus object """ return isinstance(stim, stimulus)
[docs] def set_common_time_series(stim1, stim2): """ set two signals at the same time samples, missing samples are reconstructed by holding a value Parameters ---------- stim1 : stimulus object First stimulus to synchronize stim2 : stimulus object Second object to synchronize """ stim1.insert_samples(stim2.t) stim2.insert_samples(stim1.t)
[docs] def get_equal_timing_copies(stim1, stim2): """ create two stimuli with equal timing as copies of the input ones Parameters ---------- stim1 : stimulus object First stimulus to synchronize stim2 : stimulus object Second object to synchronize Returns -------- stima : stimulus object copie of stim1 with timings from stim1 and stim2 stimb : stimulus object copie of stim2 with timings from stim1 and stim2 """ stim_a = stimulus() stim_a.s = stim1.s stim_a.t = stim1.t stim_b = stimulus() stim_b.s = stim2.s stim_b.t = stim2.t stim_a.insert_samples(stim_b.t) stim_b.insert_samples(stim_a.t) return stim_a, stim_b
[docs] def datfile_2_stim(fname, dt=0.005): """Creates a stimulus from sampled data specified in a .dat file Parameters ---------- fname : str name of the file containing the sampled data dt : float, optional value of the sampling period, in ms. By default set to 0.005ms """ stim_1 = stimulus() # open the dat file and feed the stimulus data = np.genfromtxt(fname) stim_1.t = np.arange(len(data)) * dt stim_1.s = data return stim_1
################################ ## Stimulus object definition ## ################################
[docs] class stimulus(NRV_class): """ Stimulus class for NRV2, signals are defined as asynchronous signals, with s the values and t as occurence timings. """
[docs] def __init__(self, s_init=0): """ Instantiation of a stimulus object. Parameters ---------- s_init : float initial value of the signal, by default 0 """ super().__init__() self.type = "stimulus" self.s = np.array([s_init]) self.t = np.array([0])
## Save and Load mehtods
[docs] def save_stimulus(self, save=False, fname="stimulus.json"): rise_warning("save_stimulus is a deprecated method use save") self.save(save=save, fname=fname)
[docs] def load_stimulus(self, data="stimulus.json"): rise_warning("load_stimulus is a deprecated method use load") self.load(data=data)
############################ ## basic handling methods ## ############################
[docs] def append(self, s, t): """ Append a sample to the signal, internal use only. Parameters ---------- s : float value of the signal to append, numpy array like t : float time of signal change """ self.s = np.append(self.s, np.asarray(s)) self.t = np.append(self.t, np.asarray(t))
[docs] def concatenate(self, s, t, t_shift=0): """ Concatenate samples to a signal, internal use only. Parameters ---------- s : float values of signal, numpy array like t : float time serie of the values, numpy array like t_shift : float time to shift the t values at the end of original signal, by default equal to 0. Here to prevent possible duplicates if the t serie starts with 0 """ self.s = np.append(self.s, np.asarray(s)) self.t = np.append(self.t, np.asarray(t) + self.t[-1] + t_shift)
[docs] def len(self): """ Returns the length of the signal Returns ------- len : int length of the signal (s and t attribute should have the same length if the signal is not corrupted) """ return len(self.s)
[docs] def sort(self): """ Sort the signal with increasing timings """ ordered_ind = np.argsort(self.t) self.s = self.s[ordered_ind] self.t = self.t[ordered_ind]
[docs] def insert_samples(self, t): """ Insert samples inside a signal and adapt values consequently Parameters ---------- t : array, np.array or list time serie of samples to insert, numpy array-like, dosent have to be sorted """ # add the new time values at their place and remove duplicates new_t = np.unique(np.sort(np.append(self.t, np.asarray(t)))) new_s = [self.s[0]] j = 1 for k in range(1, len(new_t)): if j < self.len(): if new_t[k] < self.t[j]: # added sample, hold the last value new_s.append(new_s[-1]) else: # orinal sample, retrieve value in original signal and increment counter new_s.append(self.s[j]) j += 1 else: new_s.append(self.s[-1]) self.s = np.asarray(new_s) self.t = new_t
[docs] def snap_time(self, dt_min): i_mask = [True for _ in range(len(self.t))] for i in range(len(self.t) - 1): j = 0 if self.t[i + 1] - self.t[i] < dt_min: while i > j and not i_mask[i - j]: j += 1 if self.t[i + 1] - self.t[i - j] < dt_min: i_mask[i + 1] = False self.s = self.s[i_mask] self.t = self.t[i_mask]
[docs] def plot(self, ax: plt.axes, scatter=False, **ax_kwargs): """ Plot the stimulus """ if scatter: ax.scatter(self.t, self.s, **ax_kwargs) else: ax.step(self.t, self.s, where="post", **ax_kwargs)
##################### ## special methods ## ##################### def __len__(self): return self.len() def __abs__(self): stim = stimulus() stim.s = np.absolute(self.s) stim.t = self.t return stim def __neg__(self): stim = stimulus() stim.s = -self.s stim.t = self.t return stim def __add__(self, b): C = stimulus() if is_stim(b): A, B = get_equal_timing_copies(self, b) C.s = A.s + B.s C.t = A.t else: C.s = self.s + float(b) C.t = self.t return C def __sub__(self, b): C = stimulus() if is_stim(b): A, B = get_equal_timing_copies(self, b) C.s = A.s - B.s C.t = A.t else: C.s = self.s - float(b) C.t = self.t return C def __mul__(self, b): C = stimulus() if is_stim(b): A, B = get_equal_timing_copies(self, b) C.s = A.s * B.s C.t = A.t else: C.s = self.s * float(b) C.t = self.t return C def __radd__(self, b): C = stimulus() if is_stim(b): A, B = get_equal_timing_copies(self, b) C.s = B.s + A.s C.t = A.t else: C.s = float(b) + self.s C.t = self.t return C def __rsub__(self, b): C = stimulus() if is_stim(b): A, B = get_equal_timing_copies(self, b) C.s = B.s - A.s C.t = A.t else: C.s = float(b) - self.s C.t = self.t return C def __rmul__(self, b): C = stimulus() if is_stim(b): A, B = get_equal_timing_copies(self, b) C.s = B.s * A.s C.t = A.t else: C.s = float(b) * self.s C.t = self.t return C def __pow__(self, p): C = stimulus() C.s = self.s ** float(p) return C def __eq__(self, b): if is_stim(b): A, B = get_equal_timing_copies(self, b) flag = np.array_equal(A.s, B.s) and np.array_equal(A.t, B.t) else: flag = True for value in self.s: if value != b: flag = False return flag def __ne__(self, b): # self != b return not self == b
[docs] def integrate(self): return np.trapz(self.s, x=self.t)
####################### ## signal generators ## #######################
[docs] def constant(self, value, start=0): """ Ceat a constant signal Parameters ---------- value : float Value of the constant signal start :float starting time of the constant signal, by default set to 0 """ self.append(value, start)
[docs] def pulse(self, start, value, duration=0): """ Create pulse shape signal samples Parameters ---------- value : float value of the pulse start : float starting time of the pulse duration : float duration of the pulse, optional and by default equal to zero, if non zero value, a point is added at the end of the pulse with the previous value """ s_last = self.s[-1] self.append(value, start) if duration != 0: self.append(s_last, start + duration)
[docs] def biphasic_pulse( self, start, s_cathod, t_stim, s_anod, t_inter, anod_first=False ): """ Create a biphasic pulse waveform Parameters ---------- start : float starting time of the waveform, in ms s_cathod : float cathodic (negative stimulation value) current, in uA t_stim : float stimulation time, in ms s_anod : float anodic (positive stimulation value) current and, in uA t_inter : float inter pulse timing, in ms anod_first : bool if true, stimulation is anodic and begins with the anodic value and is balanced with cathodic value, else stimuation is cathodic and begins with the cathodic value and is balances with anodic value, by default set to False (cathodic first as most stimulation protocols) WARNING ------- `s_cathod` must always positive, the user give here the absolute value """ if not anod_first: s_1 = -s_cathod s_2 = s_anod else: s_2 = -s_cathod s_1 = s_anod if s_2 != 0: t_balance = abs(s_1 / s_2) * t_stim else: t_balance = 0 self.concatenate(s_1, start) if t_inter == 0: self.concatenate(s_2, t_stim) if t_balance != 0: self.concatenate(0, t_balance) else: self.concatenate(0, t_stim) self.concatenate(s_2, t_inter) if t_balance != 0: self.concatenate(0, t_balance)
[docs] def sinus(self, start, duration, amplitude, freq, offset=0, phase=0, dt=0): """ Create a sinusoidal waveform Parameters ---------- start : float starting time of the waveform, in ms duration : float duration of the waveform, in ms amplitude : float amplitude of the waveform, in uA freq : float frequency of the waveform, in kHz offset : float offset current of the waveform, in uA, by default set to 0 phase : float initial phase of the waveform, in rad, by default set to 0 anod_first : bool if true, stimulation is anodic and begins with the anodic value and is balanced with cathodic value, else stimuation is cathodic and begins with the cathodic value and is balances with anodic value, by default set to False (cathodic first as most stimulation protocols) """ # check the pseudo sampling period if dt == 0: dt = 1 / (freq * 100) elif freq > (1.0 / (2 * dt)): rise_warning( "dt too low in stimulus creation, Shannon criterion not respected" ) Nb_points = int(duration / dt) # create the signal if start == 0: self.s[0] = amplitude * np.sin(phase) + offset t = np.linspace(dt, start + duration, num=Nb_points - 1) s = amplitude * np.sin(2 * np.pi * freq * t + phase) + offset self.concatenate(s, t, t_shift=0) else: t = np.linspace(0, duration, num=Nb_points) s = amplitude * np.sin(2 * np.pi * freq * t + phase) + offset t = np.linspace(start, start + duration, num=Nb_points) # add start self.concatenate(s, t, t_shift=0)
[docs] def harmonic_pulse(self, start, t_pulse, amplitude, amp_list, phase_list, dt=0): """ Create a pulse waveform based on N harmonic Parameters ---------- start : float starting time of the waveform, in ms t_pulse : float pulse duration, in ms amplitude : float final amplitude of the pulse, in uA WARNING: always positive, the user give here the absolute value amp_list : list list of relative sine amplitude, between 0 and 1 phase_list : list list of sine pulse phases dt : float sampling time period to generate the sinusoidal shape. If equal to 0, dt is automatically set to match 100 samples per sinusoid period by default set to 0 """ if len(amp_list) != len(phase_list): rise_error("amp_list and phase_list must be of same length") n_sine = len(amp_list) freq = 1 / (2 * t_pulse) if dt == 0: dt = 1 / (n_sine * freq * 100) elif freq > (1.0 / (2 * dt)): rise_warning( "dt too low in stimulus creation, Shannon criterion not respected" ) Nb_points = int(t_pulse / dt) # create the signal freq_harmonic = freq if start == 0: t = np.linspace(dt, t_pulse, num=Nb_points - 1) self.s[0] = 0 s = 0 for i in range(n_sine): self.s[0] += amp_list[i] * np.sin(phase_list[i]) s += amp_list[i] * np.sin( 2 * np.pi * freq_harmonic * t + phase_list[i] - np.pi ) freq_harmonic = freq * (i + 2) s = s - np.max(s) # shift s to avoid any postive values s = s * amplitude / np.max(np.abs(s)) # rescale s to finale amp s[-1] = 0 self.concatenate(s, t, t_shift=0) else: t = np.linspace(0, t_pulse, num=Nb_points) s = 0 for i in range(n_sine): s += amp_list[i] * np.sin( 2 * np.pi * freq_harmonic * t + phase_list[i] - np.pi ) freq_harmonic = freq * (i + 2) s = s - np.max(s) # shift s to avoid any postive values s = s * amplitude / np.max(np.abs(s)) # rescale s to finale amp s[-1] = 0 t = np.linspace(start, start + t_pulse, num=Nb_points) self.concatenate(s, t, t_shift=0)
[docs] def square(self, start, duration, freq, amplitude, offset, anod_first=False): """ Create a repetitive (periodic) square waveform Parameters ---------- start : float starting time of the waveform, in ms duration : float duration of the waveform, in ms amplitude : float amplitude of the waveform, in uA freq : float frequency of the waveform, in kHz offset : float offset current of the waveform, in uA, by default set to 0 dt : float sampling time period to generate the sinusoidal shape. If equal to 0, dt is automatically set to match 100 samples per sinusoid period by default set to 0 """ Nb_pts = 2 * int(duration / freq) - 1 dt = 1 / (2 * freq) t = np.linspace(0, duration, num=Nb_pts) s = np.ones(Nb_pts) * amplitude k_start = 0 if anod_first: k_start = 1 for k in range(k_start, len(s), 2): s[k] *= -1 s += offset self.concatenate(s, t, t_shift=start)
[docs] def ramp( self, slope, start, duration, dt, bounds=(0, float("inf")), printslope=False ): """ Create a ramp waveform with slop value Parameters ---------- slope : float slope of the waveform, in uA.ms-1 start : float starting time of the waveform, in ms duration : float duration of the waveform, in ms dt : float sampling time period to generate the sinusoidal shape bounds : tuple boundary vaues of the ramp signal printslope : bool, optional if True, the value of the slope is printed on the prompt """ # create the signal if printslope: pass_info("slope = " + str(slope)) Nb_points = int(duration / dt) t = np.array([k * dt for k in range(Nb_points)]) if slope < 0: s = max(bounds) * np.ones(Nb_points) else: s = min(bounds) * np.ones(Nb_points) point_start = int(start / dt) for i in range(point_start, Nb_points): if slope >= 0: s[i] = min(bounds[1], bounds[0] + (i - point_start) * slope * dt) if slope < 0: s[i] = max(bounds[0], bounds[1] + (i - point_start) * slope * dt) self.concatenate(s, t, t_shift=0)
[docs] def ramp_lim(self, tstart, tstop, ampstart, ampmax, duration, dt, printslope=False): """ Create a ramp waveform with bounds values Parameters ---------- ampstart : float initiale amplitude of the waveform, in uA ampmax : float final amplitude of the waveform, in uA tstart : float starting time of the waveform, in ms tmax : float starting time of the waveform, in ms duration : float duration of the waveform, in ms dt : float sampling time period to generate the sinusoidal shape printslope : bool, optional if True, the value of the slope is printed on the prompt """ slope = (ampstart - ampmax) / (tstart - tstop) bounds = (min(ampstart, ampmax), max(ampstart, ampmax)) self.ramp(slope, tstart, duration, dt, bounds, printslope)