"""
NRV-:class:`.stimulus` handling.
"""
import faulthandler
import numpy as np
import matplotlib.pyplot as plt
from ..backend._log_interface import pass_info, rise_warning, rise_error
from ..backend._NRV_Class import NRV_class
from ..backend._extlib_interface import np_trapz
# enable faulthandler to ease "segmentation faults" debug
faulthandler.enable()
###############
## Functions ##
###############
[docs]
def is_stim(stim):
"""
check if an object is a stimulus, return True if yes, else False
Parameters
----------
stim : object
object to test
Returns
-------
bool
True it the type is a stimulus object
"""
return isinstance(stim, stimulus)
[docs]
def set_common_time_series(stim1, stim2):
"""
set two signals at the same time samples, missing samples are reconstructed by holding a value
Parameters
----------
stim1 : stimulus object
First stimulus to synchronize
stim2 : stimulus object
Second object to synchronize
"""
stim1.insert_samples(stim2.t)
stim2.insert_samples(stim1.t)
[docs]
def get_equal_timing_copies(stim1, stim2):
"""
create two stimuli with equal timing as copies of the input ones
Parameters
----------
stim1 : stimulus object
First stimulus to synchronize
stim2 : stimulus object
Second object to synchronize
Returns
--------
stima : stimulus object
copie of stim1 with timings from stim1 and stim2
stimb : stimulus object
copie of stim2 with timings from stim1 and stim2
"""
stim_a = stimulus()
stim_a.s = stim1.s
stim_a.t = stim1.t
stim_b = stimulus()
stim_b.s = stim2.s
stim_b.t = stim2.t
stim_a.insert_samples(stim_b.t)
stim_b.insert_samples(stim_a.t)
return stim_a, stim_b
[docs]
def datfile_2_stim(fname, dt=0.005):
"""Creates a stimulus from sampled data specified in a .dat file
Parameters
----------
fname : str
name of the file containing the sampled data
dt : float, optional
value of the sampling period, in ms. By default set to 0.005ms
"""
stim_1 = stimulus()
# open the dat file and feed the stimulus
data = np.genfromtxt(fname)
stim_1.t = np.arange(len(data)) * dt
stim_1.s = data
return stim_1
################################
## Stimulus object definition ##
################################
[docs]
class stimulus(NRV_class):
"""
Stimulus class for NRV2,
signals are defined as asynchronous signals, with s the values and t as occurence timings.
"""
[docs]
def __init__(self, s_init=0):
"""
Instantiation of a stimulus object.
Parameters
----------
s_init : float
initial value of the signal, by default 0
"""
super().__init__()
self.type = "stimulus"
self.s = np.array([s_init])
self.t = np.array([0])
## Save and Load mehtods
[docs]
def save_stimulus(self, save=False, fname="stimulus.json"):
"""
Deprecated wrapper around :meth:`save`.
Parameters
----------
save : bool, optional
If ``True``, write the stimulus to disk.
fname : str, optional
Output filename.
"""
rise_warning("save_stimulus is a deprecated method use save")
self.save(save=save, fname=fname)
[docs]
def load_stimulus(self, data="stimulus.json"):
"""
Deprecated wrapper around :meth:`load`.
Parameters
----------
data : str | dict, optional
Serialized stimulus or filename to load.
"""
rise_warning("load_stimulus is a deprecated method use load")
self.load(data=data)
############################
## basic handling methods ##
############################
[docs]
def append(self, s, t):
"""
Append a sample to the signal, internal use only.
Parameters
----------
s : float
value of the signal to append, numpy array like
t : float
time of signal change
"""
self.s = np.append(self.s, np.asarray(s))
self.t = np.append(self.t, np.asarray(t))
[docs]
def concatenate(self, s, t, t_shift=0):
"""
Concatenate samples to a signal, internal use only.
Parameters
----------
s : float
values of signal, numpy array like
t : float
time serie of the values, numpy array like
t_shift : float
time to shift the t values at the end of original signal, by default equal to 0.
Here to prevent possible duplicates if the t serie starts with 0
"""
self.s = np.append(self.s, np.asarray(s))
self.t = np.append(self.t, np.asarray(t) + self.t[-1] + t_shift)
[docs]
def len(self):
"""
Returns the length of the signal
Returns
-------
len : int
length of the signal (s and t attribute should have the same length
if the signal is not corrupted)
"""
return len(self.s)
[docs]
def sort(self):
"""
Sort the signal with increasing timings
"""
ordered_ind = np.argsort(self.t)
self.s = self.s[ordered_ind]
self.t = self.t[ordered_ind]
[docs]
def insert_samples(self, t):
"""
Insert samples inside a signal and adapt values consequently
Parameters
----------
t : array, np.array or list
time serie of samples to insert, numpy array-like, dosent have to be sorted
"""
# add the new time values at their place and remove duplicates
new_t = np.unique(np.sort(np.append(self.t, np.asarray(t))))
new_s = [self.s[0]]
j = 1
for k in range(1, len(new_t)):
if j < self.len():
if new_t[k] < self.t[j]: # added sample, hold the last value
new_s.append(new_s[-1])
else: # orinal sample, retrieve value in original signal and increment counter
new_s.append(self.s[j])
j += 1
else:
new_s.append(self.s[-1])
self.s = np.asarray(new_s)
self.t = new_t
[docs]
def snap_time(self, dt_min):
"""
average stimulus values which are separated in time by less than ``dt_min``
"""
i_mask = [True for _ in range(len(self.t))]
for i in range(len(self.t) - 1):
j = 0
if self.t[i + 1] - self.t[i] < dt_min:
while i > j and not i_mask[i - j]:
j += 1
if self.t[i + 1] - self.t[i - j] < dt_min:
i_mask[i + 1] = False
self.s = self.s[i_mask]
self.t = self.t[i_mask]
[docs]
def plot(self, ax: plt.axes, scatter=False, **ax_kwargs):
"""
Plot the stimulus
"""
if scatter:
ax.scatter(self.t, self.s, **ax_kwargs)
else:
ax.step(self.t, self.s, where="post", **ax_kwargs)
#####################
## special methods ##
#####################
def __len__(self):
"""
Return the number of stored samples.
Returns
-------
int
Number of time samples in the stimulus.
"""
return self.len()
def __abs__(self):
"""
Create a stimulus with absolute sample amplitudes.
Returns
-------
stimulus
Absolute-value version of the stimulus.
"""
stim = stimulus()
stim.s = np.absolute(self.s)
stim.t = self.t
return stim
def __neg__(self):
"""
Create a stimulus with sign-inverted amplitudes.
Returns
-------
stimulus
Opposite of the stimulus.
"""
stim = stimulus()
stim.s = -self.s
stim.t = self.t
return stim
def __add__(self, b):
"""
Add a scalar or another stimulus to the current stimulus.
Parameters
----------
b : float | stimulus
Operand added to the stimulus.
Returns
-------
stimulus
Sum of the two operands.
"""
C = stimulus()
if is_stim(b):
A, B = get_equal_timing_copies(self, b)
C.s = A.s + B.s
C.t = A.t
else:
C.s = self.s + float(b)
C.t = self.t
return C
def __sub__(self, b):
"""
Subtract a scalar or another stimulus from the current stimulus.
Parameters
----------
b : float | stimulus
Operand subtracted from the stimulus.
Returns
-------
stimulus
Difference between the two operands.
"""
C = stimulus()
if is_stim(b):
A, B = get_equal_timing_copies(self, b)
C.s = A.s - B.s
C.t = A.t
else:
C.s = self.s - float(b)
C.t = self.t
return C
def __mul__(self, b):
"""
Multiply the stimulus by a scalar or another stimulus.
Parameters
----------
b : float | stimulus
Multiplicative operand.
Returns
-------
stimulus
Product of the two operands.
"""
C = stimulus()
if is_stim(b):
A, B = get_equal_timing_copies(self, b)
C.s = A.s * B.s
C.t = A.t
else:
C.s = self.s * float(b)
C.t = self.t
return C
def __radd__(self, b):
"""
Add the stimulus to a scalar or another stimulus.
Parameters
----------
b : float | stimulus
Left operand.
Returns
-------
stimulus
Sum of the two operands.
"""
C = stimulus()
if is_stim(b):
A, B = get_equal_timing_copies(self, b)
C.s = B.s + A.s
C.t = A.t
else:
C.s = float(b) + self.s
C.t = self.t
return C
def __rsub__(self, b):
"""
Subtract the stimulus from a scalar or another stimulus.
Parameters
----------
b : float | stimulus
Left operand.
Returns
-------
stimulus
Difference between the two operands.
"""
C = stimulus()
if is_stim(b):
A, B = get_equal_timing_copies(self, b)
C.s = B.s - A.s
C.t = A.t
else:
C.s = float(b) - self.s
C.t = self.t
return C
def __rmul__(self, b):
"""
Multiply a scalar or another stimulus by the stimulus.
Parameters
----------
b : float | stimulus
Left operand.
Returns
-------
stimulus
Product of the two operands.
"""
C = stimulus()
if is_stim(b):
A, B = get_equal_timing_copies(self, b)
C.s = B.s * A.s
C.t = A.t
else:
C.s = float(b) * self.s
C.t = self.t
return C
def __pow__(self, p):
"""
Raise all stimulus amplitudes to a scalar power.
Parameters
----------
p : float
Power applied to each sample value.
Returns
-------
stimulus
Powered stimulus.
"""
C = stimulus()
C.s = self.s ** float(p)
return C
def __eq__(self, b):
"""
Compare the stimulus to a scalar or another stimulus.
Parameters
----------
b : float | stimulus
Object to compare with.
Returns
-------
bool
``True`` if both operands are equal sample-wise.
"""
if is_stim(b):
A, B = get_equal_timing_copies(self, b)
flag = np.array_equal(A.s, B.s) and np.array_equal(A.t, B.t)
else:
flag = True
for value in self.s:
if value != b:
flag = False
return flag
def __ne__(self, b): # self != b
"""
Compare the stimulus for inequality.
Parameters
----------
b : float | stimulus
Object to compare with.
Returns
-------
bool
``True`` if the operands are not equal.
"""
return not self == b
[docs]
def integrate(self):
"""
Integrate the stimulus over time.
Returns
-------
float
Time integral of the stimulus waveform.
"""
return np_trapz(self.s, x=self.t)
#######################
## signal generators ##
#######################
[docs]
def constant(self, value, start=0):
"""
Ceat a constant signal
Parameters
----------
value : float
Value of the constant signal
start :float
starting time of the constant signal, by default set to 0
"""
self.append(value, start)
[docs]
def pulse(self, start, value, duration=0):
"""
Create pulse shape signal samples
Parameters
----------
value : float
value of the pulse
start : float
starting time of the pulse
duration : float
duration of the pulse, optional and by default equal to zero, if non zero value,
a point is added at the end of the pulse with the previous value
"""
s_last = self.s[-1]
self.append(value, start)
if duration != 0:
self.append(s_last, start + duration)
[docs]
def biphasic_pulse(
self, start, s_cathod, t_stim, s_anod, t_inter, anod_first=False
):
"""
Create a biphasic pulse waveform
Parameters
----------
start : float
starting time of the waveform, in ms
s_cathod : float
cathodic (negative stimulation value) current, in uA
t_stim : float
stimulation time, in ms
s_anod : float
anodic (positive stimulation value) current and, in uA
t_inter : float
inter pulse timing, in ms
anod_first : bool
if true, stimulation is anodic and begins with the anodic value
and is balanced with cathodic value, else stimuation is cathodic
and begins with the cathodic value and is balances with anodic value,
by default set to False (cathodic first as most stimulation protocols)
Warning
-------
`s_cathod` must always positive, the user give here the absolute value
"""
if not anod_first:
s_1 = -s_cathod
s_2 = s_anod
else:
s_2 = -s_cathod
s_1 = s_anod
if s_2 != 0:
t_balance = abs(s_1 / s_2) * t_stim
else:
t_balance = 0
self.concatenate(s_1, start)
if t_inter == 0:
self.concatenate(s_2, t_stim)
if t_balance != 0:
self.concatenate(0, t_balance)
else:
self.concatenate(0, t_stim)
self.concatenate(s_2, t_inter)
if t_balance != 0:
self.concatenate(0, t_balance)
[docs]
def sinus(self, start, duration, amplitude, freq, offset=0, phase=0, dt=0):
"""
Create a sinusoidal waveform
Parameters
----------
start : float
starting time of the waveform, in ms
duration : float
duration of the waveform, in ms
amplitude : float
amplitude of the waveform, in uA
freq : float
frequency of the waveform, in kHz
offset : float
offset current of the waveform, in uA, by default set to 0
phase : float
initial phase of the waveform, in rad, by default set to 0
anod_first : bool
if true, stimulation is anodic and begins with the anodic value
and is balanced with cathodic value, else stimuation is cathodic
and begins with the cathodic value and is balances with anodic value,
by default set to False (cathodic first as most stimulation protocols)
"""
# check the pseudo sampling period
if dt == 0:
dt = 1 / (freq * 100)
elif freq > (1.0 / (2 * dt)):
rise_warning(
"dt too low in stimulus creation, Shannon criterion not respected"
)
Nb_points = int(duration / dt)
# create the signal
if start == 0:
self.s[0] = amplitude * np.sin(phase) + offset
t = np.linspace(dt, start + duration, num=Nb_points - 1)
s = amplitude * np.sin(2 * np.pi * freq * t + phase) + offset
self.concatenate(s, t, t_shift=0)
else:
t = np.linspace(0, duration, num=Nb_points)
s = amplitude * np.sin(2 * np.pi * freq * t + phase) + offset
t = np.linspace(start, start + duration, num=Nb_points) # add start
self.concatenate(s, t, t_shift=0)
[docs]
def harmonic_pulse(self, start, t_pulse, amplitude, amp_list, phase_list, dt=0):
"""
Create a pulse waveform based on N harmonic
Parameters
----------
start : float
starting time of the waveform, in ms
t_pulse : float
pulse duration, in ms
amplitude : float
final amplitude of the pulse, in uA
WARNING: always positive, the user give here the absolute value
amp_list : list
list of relative sine amplitude, between 0 and 1
phase_list : list
list of sine pulse phases
dt : float
sampling time period to generate the sinusoidal shape. If equal to 0,
dt is automatically set to match 100 samples per sinusoid period by default set to 0
"""
if len(amp_list) != len(phase_list):
rise_error("amp_list and phase_list must be of same length")
n_sine = len(amp_list)
freq = 1 / (2 * t_pulse)
if dt == 0:
dt = 1 / (n_sine * freq * 100)
elif freq > (1.0 / (2 * dt)):
rise_warning(
"dt too low in stimulus creation, Shannon criterion not respected"
)
Nb_points = int(t_pulse / dt)
# create the signal
freq_harmonic = freq
if start == 0:
t = np.linspace(dt, t_pulse, num=Nb_points - 1)
self.s[0] = 0
s = 0
for i in range(n_sine):
self.s[0] += amp_list[i] * np.sin(phase_list[i])
s += amp_list[i] * np.sin(
2 * np.pi * freq_harmonic * t + phase_list[i] - np.pi
)
freq_harmonic = freq * (i + 2)
s = s - np.max(s) # shift s to avoid any postive values
s = s * amplitude / np.max(np.abs(s)) # rescale s to finale amp
s[-1] = 0
self.concatenate(s, t, t_shift=0)
else:
t = np.linspace(0, t_pulse, num=Nb_points)
s = 0
for i in range(n_sine):
s += amp_list[i] * np.sin(
2 * np.pi * freq_harmonic * t + phase_list[i] - np.pi
)
freq_harmonic = freq * (i + 2)
s = s - np.max(s) # shift s to avoid any postive values
s = s * amplitude / np.max(np.abs(s)) # rescale s to finale amp
s[-1] = 0
t = np.linspace(start, start + t_pulse, num=Nb_points)
self.concatenate(s, t, t_shift=0)
[docs]
def square(self, start, duration, freq, amplitude, offset, anod_first=False):
"""
Create a repetitive (periodic) square waveform
Parameters
----------
start : float
starting time of the waveform, in ms
duration : float
duration of the waveform, in ms
amplitude : float
amplitude of the waveform, in uA
freq : float
frequency of the waveform, in kHz
offset : float
offset current of the waveform, in uA, by default set to 0
dt : float
sampling time period to generate the sinusoidal shape. If equal to 0,
dt is automatically set to match 100 samples per sinusoid period by default set to 0
"""
Nb_pts = 2 * int(duration / freq) - 1
dt = 1 / (2 * freq)
t = np.linspace(0, duration, num=Nb_pts)
s = np.ones(Nb_pts) * amplitude
k_start = 0
if anod_first:
k_start = 1
for k in range(k_start, len(s), 2):
s[k] *= -1
s += offset
self.concatenate(s, t, t_shift=start)
[docs]
def ramp(
self, slope, start, duration, dt, bounds=(0, float("inf")), printslope=False
):
"""
Create a ramp waveform with slop value
Parameters
----------
slope : float
slope of the waveform, in uA.ms-1
start : float
starting time of the waveform, in ms
duration : float
duration of the waveform, in ms
dt : float
sampling time period to generate the sinusoidal shape
bounds : tuple
boundary vaues of the ramp signal
printslope : bool, optional
if True, the value of the slope is printed on the prompt
"""
# create the signal
if printslope:
pass_info("slope = " + str(slope))
Nb_points = int(duration / dt)
t = np.array([k * dt for k in range(Nb_points)])
if slope < 0:
s = max(bounds) * np.ones(Nb_points)
else:
s = min(bounds) * np.ones(Nb_points)
point_start = int(start / dt)
for i in range(point_start, Nb_points):
if slope >= 0:
s[i] = min(bounds[1], bounds[0] + (i - point_start) * slope * dt)
if slope < 0:
s[i] = max(bounds[0], bounds[1] + (i - point_start) * slope * dt)
self.concatenate(s, t, t_shift=0)
[docs]
def ramp_lim(self, tstart, tstop, ampstart, ampmax, duration, dt, printslope=False):
"""
Create a ramp waveform with bounds values
Parameters
----------
ampstart : float
initiale amplitude of the waveform, in uA
ampmax : float
final amplitude of the waveform, in uA
tstart : float
starting time of the waveform, in ms
tmax : float
starting time of the waveform, in ms
duration : float
duration of the waveform, in ms
dt : float
sampling time period to generate the sinusoidal shape
printslope : bool, optional
if True, the value of the slope is printed on the prompt
"""
slope = (ampstart - ampmax) / (tstart - tstop)
bounds = (min(ampstart, ampmax), max(ampstart, ampmax))
self.ramp(slope, tstart, duration, dt, bounds, printslope)
[docs]
def gaussian_noise(self, tstart, tstop, standard_dev, offset=0, dt=0.005):
"""
Create a gaussian noise signal
Parameters
----------
tstart : float
starting time of the waveform, in ms
tstop : float
stopping time of the waveform, in ms
standard_dev : float
standard deviation of the gaussian noise, in uA
offset : float
offset current of the waveform, in uA, by default set to 0
dt : float
sampling time period to generate the sinusoidal shape, by default set to 0.005ms
"""
Nb_points = int((tstop - tstart) / dt) + 1
t = np.linspace(tstart, tstop, num=Nb_points)
s = np.random.normal(loc=offset, scale=standard_dev, size=Nb_points)
self.concatenate(s, t, t_shift=tstart)