Source code for nrv.utils._nrv_function

"""
NRV-:class:`nrv_function` handling.
"""

from abc import abstractmethod

import numpy as np
from scipy.interpolate import CubicHermiteSpline, interp1d
from scipy.special import erf

from ..backend._file_handler import json_dump, json_load
from ..backend._log_interface import rise_warning, rise_error
from ..backend._NRV_Class import NRV_class, is_empty_iterable

#############################
## sigma functions classes ##
#############################
spy_interp1D_kind = [
    "linear",
    "nearest",
    "nearest-up",
    "zero",
    "slinear",
    "quadratic",
    "cubic",
    "previous",
    "next",
]


[docs] class nrv_function(NRV_class): """ Class containg all comon method of fonction used in nrv """
[docs] @abstractmethod def __init__(self): super().__init__() self.f_type = "nrv_function" self.dim = 0
def __call__(self, *X): return self.call_method(*X)
[docs] @staticmethod def call_method(self, X): return X
def __compatible(self, b): if not callable(b): return True elif "dim" in self.__dir__() and "dim" in b.__dir__(): if self.dim == b.dim: return True else: rise_warning("Type not compatible for operation", type(self), type(b)) return False def __neg__(self): c = eval(self.f_type)() c.call_method = lambda *X: -self(*X) return c def __abs__(self): c = eval(self.f_type)() c.call_method = lambda *X: abs(self(*X)) return c def __add__(self, b): if self.__compatible(b): c = eval(self.f_type)() if callable(b): c.call_method = lambda *X: self(*X) + b(*X) else: c.call_method = lambda *X: self(*X) + b return c def __radd__(self, b): if self.__compatible(b): c = eval(self.f_type)() if callable(b): c.call_method = lambda *X: b * (X) + self(*X) else: c.call_method = lambda *X: b + self(*X) return c def __sub__(self, b): if self.__compatible(b): c = eval(self.f_type)() if callable(b): c.call_method = lambda *X: self(*X) - b(*X) else: c.call_method = lambda *X: self(*X) - b return c def __rsub__(self, b): if self.__compatible(b): c = eval(self.f_type)() if callable(b): c.call_method = lambda *X: b(*X) - self(*X) else: c.call_method = lambda *X: b - self(*X) return c def __mul__(self, b): if self.__compatible(b): c = eval(self.f_type)() if callable(b): c.call_method = lambda *X: self(*X) * b(*X) else: c.call_method = lambda *X: self(*X) * b return c def __rmul__(self, b): if self.__compatible(b): c = eval(self.f_type)() if callable(b): c.call_method = lambda *X: b(*X) * self(*X) else: c.call_method = lambda *X: b * self(*X) return c
############################################################### ####################### 1D functions ########################## ###############################################################
[docs] class function_1D(nrv_function): """ class containing function from IR to IR Such function can be call either on 1 value or on a ndarray (and applied on each value) """
[docs] def __init__(self): super().__init__() self.f_type = "function_1D" self.dim = 1
# @staticmethod
[docs] def call_method(self, X): # Composition with N dim function if isinstance(X, nrv_function): c_type = "function_" + str(X.dim) + "D" c = eval(c_type)() c.call_method = lambda *x: self(X(*x)) return c else: return None
[docs] class gaussian(function_1D): r""" gaussian function define as: .. math:: f(x) = e^{-\frac{(x-\mu)^2}{2*\sigma^2}} """
[docs] def __init__(self, mu=0, sigma=1): """ """ super().__init__() self.mu = mu self.sigma = sigma
[docs] def call_method(self, X): res = super().call_method(X) if res is None: res = np.exp( -(((X - self.mu) / self.sigma) ** 2) / 2 ) # /(self.sigma*(2*np.pi)**0.5) return res
[docs] class gate(function_1D):
[docs] def __init__(self, mu=0, sigma=1, kind="Rational", N=None): """ """ super().__init__() # self.kind = self.mu = mu self.sigma = sigma self.N = N self.kind = kind
[docs] def call_method(self, X): res = super().call_method(X) if res is None: X_eff = (X - self.mu) / self.sigma if self.N is None: res = (np.sign(X_eff + 0.5) - np.sign(X_eff - 0.5)) / 2 elif self.kind.lower() == "rational": res = 1 / ((2 * X_eff) ** (2 * self.N) + 1) else: res = ( erf((X_eff + (0.5 * self.sigma) / self.mu) / self.N) - erf((X_eff - (0.5 * self.sigma) / self.mu) / self.N) ) / 2 return res
############################################################### ####################### 2D functions ########################## ###############################################################
[docs] class function_2D(nrv_function): """ class containing function from IR^2 to IR Such function can be call either on 1 value or on a ndarray (and applied on each value) """
[docs] def __init__(self): super().__init__() self.f_type = "function_2D" self.dim = 2
[docs] @staticmethod def call_method(self, X): return 1
[docs] class ackley(function_2D): """ Bi-dimensional Ackley function """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, x: np.array, y: np.array) -> np.array: return ( -20 * np.exp(-0.2 * np.sqrt(0.5 * (x**2 + y**2))) - np.exp(0.5 * (np.cos(2 * np.pi * x) + np.cos(2 * np.pi * y))) + np.e + 20 )
[docs] class beale(function_2D): """ Bi-dimensional Beale function """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, x: np.array, y: np.array) -> np.array: return ( (1.5 - x + x * y) ** 2 + (2.25 - x + x * y**2) ** 2 + (2.625 - x + x * y**3) ** 2 )
[docs] class goldstein_price(function_2D): """ Bi-dimensional Goldstein function """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, x: np.array, y: np.array) -> np.array: return ( 1 + (x + y + 1) ** 2 * (19 - 14 * x + 3 * x**2 - 14 * y + 6 * x * y + 3 * y**2) ) * ( 30 + (2 * x - 3 * y) ** 2 * (18 - 32 * x + 12 * x**2 + 48 * y - 36 * x * y + 27 * y**2) )
[docs] class booth(function_2D): """ Bi-dimensional Booth function """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, x: np.array, y: np.array) -> np.array: return (x + 2 * y - 7) ** 2 + (2 * x + y - 5) ** 2
[docs] class bukin6(function_2D): """ Bi-dimensional Booth function """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, x: np.array, y: np.array) -> np.array: return 100 * np.sqrt(np.abs(y - 0.01 * x**2)) + 0.01 * np.abs(x + 10)
############################################################### ####################### ND functions ########################## ###############################################################
[docs] class function_ND(nrv_function): """ class containing function from IR^n to IR Such function can be call either on 1 value or on a ndarray (and applied on each value) """
[docs] def __init__(self): super().__init__() self.f_type = "function_ND" self.dim = "N"
[docs] @staticmethod def call_method(self, *X): return 1
[docs] class Id(function_ND): """ """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, *x: np.array) -> np.array: if len(x) == 1 and np.iterable(x[0]): x = x[0] return x
[docs] class rosenbock(function_ND): """ Multi-dimensional Rosenbock function """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, *x: np.array) -> np.array: if len(x) == 1 and np.iterable(x[0]): x = x[0] result = 0 for i in range(1, len(x)): result += 100 * (x[i] - x[i - 1] ** 2) ** 2 + (1 - x[i - 1]) ** 2 return result
[docs] class rastrigin(function_ND): """ Multi-dimensional Rastrigin function """
[docs] def __init__(self): super().__init__()
[docs] def call_method(self, *x: np.array) -> np.array: if len(x) == 1 and np.iterable(x[0]): x = x[0] A = 10.0 n = len(x) result = A * n for xi in x: result += xi**2 - A * np.cos(2 * np.pi * xi) return result
[docs] class sphere(function_ND): """ Multi-dimensional Sphere function """
[docs] def __init__(self, Xc=None): super().__init__() self.Xc = Xc if self.Xc is None: self.Xc = []
[docs] def call_method(self, *X): if len(X) == 1 and np.iterable(X[0]): X = X[0] Nc = len(self.Xc) res = 0 for i, xi in enumerate(X): if i < Nc: xc = self.Xc[i] else: xc = 0 res += (xi - xc) ** 2 return res**0.5
########################################################### #################### cost_evaluation ##################### ###########################################################
[docs] class cost_evaluation(nrv_function):
[docs] def __init__(self): super().__init__() self.f_type = "cost_evaluation"
[docs] @staticmethod def call_method(self, static_sim) -> float: return 1
def __call__(self, results, **kwargs) -> float: return self.call_method(results)
############################################################### ####################### interpolation ###################### ###############################################################
[docs] class nrv_interp(nrv_function): """ Interpolator based on scipy.interpolate Parameters ---------- X_values : (N,) array_like A 1-D array of real values Y_values :(…,N,…) array_like A N-D array of real values. The length of y along the interpolation axis must be equal to the length of x. kind : str kind of interpolation which should be used, by default "linear" possible values: - scipy interp1d kinds (see nrv.spy_interp1D_kind) - {'hermite', 'cardinal', 'catmull-rom'} for respective cubic spline interpollation dx if minimal distance between 2 consecutive points, by default 0.01 interpolator custumize intepolation function if None kind is use to set the interpolator, by default None dxdy if kind is 'hermite', derivative vector (see scipy.interpolate.CubicHermiteSpline), by default None scale if kind is 'cardinal', scale to use for derivative vector, by default None columns : int or array_like columns on which apply the interpolation when class called if [] iterpolation done on the whole vector,by default [] Returns ------- output : tuple(3) (domain, cell_tag, facet_tag) """
[docs] def __init__( self, X_values, Y_values, kind="linear", dx=0.01, interpolator=None, dxdy=None, scale=None, columns=[], ): super().__init__() self.f_type = "nrv_interp" # General parameters self.X_values = X_values self.Y_values = Y_values self.N_pts = len(Y_values) self.interpolator = interpolator self.kind = kind self.dx = dx # CubicHermiteSpline parameters self.dxdy = dxdy # Cardinal Spline parameters self.scale = scale self.columns = columns self.__set_interp()
def __set_interp(self): """ Internal use only, set interpolator """ self.N_pts = len(self.X_values) self.__check_X_values() self.update_interpolator() def __check_X_values(self, dx=None): """ """ S = self.X_values.argsort() self.X_values = self.X_values[S] self.Y_values = self.Y_values[S] if dx is not None: self.dx = dx for i, x in enumerate(self.X_values): if i > 0: while x < self.X_values[i - 1]: x += dx self.X_values[i] = x
[docs] def update_interpolator(self, kind=None, interpolator=None, dxdy=None, scale=None): """ """ if kind is not None: self.kind = kind if interpolator is not None: self.interpolator = interpolator elif self.kind.lower() in spy_interp1D_kind: self.interpolator = interp1d( self.X_values, self.Y_values, kind=self.kind, fill_value="extrapolate" ) elif self.kind.lower() in ["hermite", "cardinal", "catmull-rom"]: if self.scale is None or self.kind == "catmull-rom": self.scale = 0.5 if self.dxdy is None: self.dxdy = np.array( [ (self.Y_values[1] - self.Y_values[0]) / self.X_values[1] - self.X_values[0] ] + [ (self.Y_values[k + 2] - self.Y_values[k]) / (self.X_values[k + 2] - self.X_values[k]) for k in range(self.N_pts - 2) ] + [ (self.Y_values[-1] - self.Y_values[-2]) / (self.X_values[-1] - self.X_values[-2]) ] ) if self.kind.lower() == "hermite" and dxdy is not None: self.dxdy = dxdy self.scale = 1 if self.kind.lower() == "cardinal" and scale is not None: self.scale = scale self.interpolator = CubicHermiteSpline( self.X_values, self.Y_values, self.scale * self.dxdy )
# Mathematical operations def __add__(self, b): if isinstance(b, nrv_interp): if np.allclose(b.X_values, self.X_values): Y = b.Y_values + self.Y_values # TO ADD: computation of dxdy when both are set else: rise_error( "Not implemented: operations of nrv_iterp with different X_scale" ) else: Y = self.Y_values + b return nrv_interp( X_values=self.X_values, Y_values=Y, interpolator=self.interpolator, kind=self.kind, dx=self.dx, dxdy=None, scale=self.scale, columns=self.columns, ) def __mul__(self, b): if isinstance(b, nrv_interp): if np.allclose(b.X_values, self.X_values): Y = self.Y_values * b.Y_values # TO ADD: computation of dxdy when both are set else: rise_error( "Not implemented: operations of nrv_iterp with different X_scale" ) else: Y = self.Y_values * b return nrv_interp( X_values=self.X_values, Y_values=Y, interpolator=self.interpolator, kind=self.kind, dx=self.dx, dxdy=None, scale=self.scale, columns=self.columns, ) def __truediv__(self, b): if isinstance(b, nrv_interp): if np.allclose(b.X_values, self.X_values): if np.isclose(b.Y_values, 0).any(): rise_error(ZeroDivisionError, "float division by zero in Python") Y = self.Y_values / b.Y_values # TO ADD: computation of dxdy when both are set else: rise_error( NotImplementedError, "operations of nrv_iterp with different X_scale", ) else: Y = self.Y_values / b return nrv_interp( X_values=self.X_values, Y_values=Y, interpolator=self.interpolator, kind=self.kind, dx=self.dx, dxdy=None, scale=self.scale, columns=self.columns, ) def __rtruediv__(self, b): if np.isclose(self.Y_values, 0).any(): rise_error(ZeroDivisionError, " float division by zero in Python") if isinstance(b, nrv_interp): if np.allclose(b.X_values, self.X_values): Y = b.Y_values / self.Y_values # TO ADD: computation of dxdy when both are set else: rise_error( NotImplementedError, "operations of nrv_iterp with different X_scale", ) else: Y = b / self.Y_values return nrv_interp( X_values=self.X_values, Y_values=Y, interpolator=self.interpolator, kind=self.kind, dx=self.dx, dxdy=self.dxdy, scale=self.scale, columns=self.columns, ) def __radd__(self, b): return self.__add__(b) def __rmul__(self, b): return self.__mul__(b) def __sub__(self, b): return self.__add__(-b) def __rsub__(self, b): return self.__sub__(b) def __call__(self, X): if is_empty_iterable(self.columns): return self.interpolator(X) try: return self.interpolator(X[self.columns]) except: rise_warning( "nrv_interpol: columns out of bound, intepolation done on the whole vector" ) return self.interpolator(X)
[docs] class MeshCallBack(nrv_function): """ """
[docs] def __init__(self, f=None, axis="x"): super().__init__() self.f_type = "MeshCallBack" self.f = None self.axis = axis self.set_function(f)
[docs] def set_function(self, f=None): if f is None: self.f = lambda *x: 1 elif callable(f): self.f = f elif isinstance(f, str): self.f = lambda x: eval(f) else: rise_warning(type(f), "Not recognized for MeshCallBack function")
def __call__(self, dim, tag, x, y, z, lc): X = [] if "x" in self.axis: X += [x] if "y" in self.axis: X += [y] if "z" in self.axis: X += [z] return lc * self.f(*X)