Source code for nrv.optim._Optimizers

"""
NRV-:class:`.Optimizer` handling.
"""

from abc import ABCMeta, abstractmethod
import os
from typing import Any
import numpy as np
import matplotlib.pyplot as plt
from time import perf_counter, asctime, localtime
import traceback
import json
import sys
import pyswarms as ps
from pyswarms.backend.topology import Star, Ring
import scipy.optimize as scpopt


from ..backend._NRV_Class import NRV_class
from ..backend._parameters import parameters
from ..backend._log_interface import set_log_level, rise_error, rise_warning

from .optim_utils._OptimResults import optim_results

dir_path = parameters.nrv_path + "/_misc"
dir_path + "/log/NRV.log"
# mymodule.py


[docs] class Optimizer(NRV_class, metaclass=ABCMeta): """ Abstract base class for all NRV optimizers. """
[docs] @abstractmethod def __init__(self, method=None): """ Initialize an optimizer backend. Parameters ---------- method : str | None, optional Human-readable identifier of the optimization method. """ super().__init__() self._method = method self.swarm_optimizer = False
[docs] def minimize(self, f: callable, **kwargs) -> optim_results: """ Minimze the function ``f`` using the `optimzer`'s parameters Parameters ---------- f : callable function to minimize kwargs : dict containing parameters to change in the class (:class:``PSO_optimizer``) Returns ------- optim_results results of the optimization """ self.set_parameters(**kwargs) results = optim_results(self.save(save=False)) results["date"] = asctime(localtime()) results["status"] = "Processing" return results
def __call__(self, f: callable, **kwargs: Any) -> optim_results: """ Alias of :meth:`minimize`. """ return self.minimize(f, **kwargs)
[docs] class scipy_optimizer(Optimizer): """ Parameters ---------- method : str | callable, optional Type of solver. Should be one of - 'Nelder-Mead' - 'Powell' - 'CG' - 'BFGS' - 'Newton-CG' - 'L-BFGS-B' - 'TNC' - 'COBYLA' - 'SLSQP' - 'trust-constr' - 'dogleg' - 'trust-ncg' - 'trust-exact' - 'trust-krylov' - custom - a callable object x0 : np.ndarray, optional Initial guess. Array of real elements of size (n,), where n is the number of independent variables,. by default None args : tuple, optional Extra arguments passed to the objective function and its derivatives (fun, jac and hess functions), by default () jac : {callable, '2-point', '3-point', 'cs', bool}, optional Method for computing the gradient vector, optional _description_, by default None hess : {callable, '2-point', '3-point', 'cs', HessianUpdateStrategy}, optional Method for computing the Hessian matrix, by default None hessp : callable, optional Hessian of objective function times an arbitrary vector p. Only for Newton-CG, trust-ncg, trust-krylov, trust-constr, by default None bounds : sequence or `Bounds`, optional Bounds on variables for Nelder-Mead, L-BFGS-B, TNC, SLSQP, Powell, trust-constr, and COBYLA methods., by default None constraints : {Constraint, dict} or List of {Constraint, dict}, optional Constraints definition. Only for COBYLA, SLSQP and trust-constr, by default () tol : float, optional Tolerance for termination, by default None callback : callable, optional A callable called after each iteration, by default None maxiter : int, optional _description_, by default None options : dict, optional A dictionary of solver options, by default None dimension : int, optional _description_, by default None normalize : bool, optional _description_, by default False Note ---- The following arguments are those used in scipy.optimize.minimize (see scipy documentation [2] for more informations): ``x0``, ``args``, ``jac``, ``hess``, ``hessp``, ``bounds``, ``constraints``, ``tol``, ``callback``, ``options`` Examples -------- The following lines shows how `scipy_optimizer` can be used to minimze a 2D sphere function. >>> import nrv >>> my_cost1 = nrv.sphere() >>> my_opt = nrv.scipy_optimizer(dimension=2, x0= [100, 10], maxiter=10) >>> res = my_opt.minimize(my_cost1) >>> print("best position",res.x, "best cost", res.best_cost) best position [-1.52666570e-08 -1.32835147e-07] best cost 1.337095622502969e-07 References ---------- [1] `scipy <https://docs.scipy.org/doc/scipy/>`_ [2] `scipy.optimize.minimize <https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.minimize.html#scipy.optimize.minimize>`_ """
[docs] def __init__( self, method: str = None, x0: np.ndarray = None, args: tuple = (), jac: callable = None, hess: callable = None, hessp: callable = None, bounds: tuple = None, constraints: dict = (), tol: float = None, callback: callable = None, maxiter: int = None, options: dict = None, dimension: int = None, normalize: bool = False, ): """ Initialize a SciPy-based optimizer wrapper. """ if method is None: super().__init__("scipy_default") else: super().__init__("scipy_" + method) self.x0 = x0 self.args = args self.scipy_method = method self.jac = jac self.hess = hess self.hessp = hessp self.bounds = bounds self.constraints = constraints self.tol = tol self.callback = callback self.options = options or {} self.maxiter = maxiter self.dimensions = dimension self.normalize = normalize self.scale_translation = None self.scale_homothety = None self.scaled_bounds = None
def __update_dimensions(self, results): """ """ if self.bounds is None: if self.dimensions is not None: self.bounds = [(None, None) for _ in range(self.dimensions)] if self.x0 is None: if self.bounds is None: rise_error( "scipy optimizer: at least x0 or boundaries should be initiate" ) else: self.dimensions = len(self.bounds) self.x0 = [] for i in range(self.dimensions): min, max = self.bounds[i] if min is None: min = np.finfo(np.float32).min if max is None: max = np.finfo(np.float32).max self.x0 += [np.random.uniform(min, max)] results["x0"] = self.x0 else: self.dimensions = len(self.x0) results["dimensions"] = self.dimensions def __normalize_bound(self, results): """ Normalize optimization bounds to the unit hypercube when requested. Parameters ---------- results : optim_results Results dictionary updated in place with scaling information. """ self.scale_translation = np.zeros(self.dimensions) self.scale_homothety = np.ones(self.dimensions) if not self.normalize: self.scaled_bounds = self.bounds else: self.scaled_bounds = [] for i, bd in enumerate(self.bounds): self.scaled_bounds += [(0, 1)] self.scale_translation[i] = bd[0] self.scale_homothety[i] = bd[1] - bd[0] results["scale_translation"] = self.scale_translation results["scale_homothety"] = self.scale_homothety results["scaled_bounds"] = self.scaled_bounds
[docs] def minimize(self, f: callable, **kwargs) -> optim_results: """ Minimize a scalar cost function using :func:`scipy.optimize.minimize`. Parameters ---------- f : callable Objective function taking one parameter vector. **kwargs : dict Optimizer parameters overriding the current instance attributes. Returns ------- optim_results Optimization results. """ results = super().minimize(f, **kwargs) if self.maxiter is not None: self.options["maxiter"] = self.maxiter self.__update_dimensions(results) self.__normalize_bound(results) results["cost_history"] = [] results["position_history"] = [] def f_history(x): x = (results["scale_homothety"] * x) + results["scale_translation"] c = f(x) results["position_history"].append([x]) results["cost_history"].append(c) return c t0 = perf_counter() res = scpopt.minimize( fun=f_history, x0=self.x0, args=self.args, method=self.scipy_method, jac=self.jac, hess=self.hess, hessp=self.hessp, bounds=self.scaled_bounds, constraints=self.constraints, tol=self.tol, callback=self.callback, options=self.options, ) res.x = (results["scale_homothety"] * res.x) + results["scale_translation"] t_opt = perf_counter() - t0 results["optimization_time"] = t_opt results["best_cost"] = res.fun results["best_position"] = res.x results.update(res) # hess_inv cannot be converted to json if "hess_inv" in results: del results["hess_inv"] results["status"] = "Completed" return results
[docs] class PSO_optimizer(Optimizer): """ Perform a Particle swarm optimization (PSO) on with a defined cost function using pyswarms library[1] Parameters ---------- n_particles : int number of particle of the swarm, by default 5 dimensions : int number of dimensions of each particle options : dict hyperparameter of the PSO maxiter : int number of iteration of the PSO n_processes : int number of process used to parallelize cost calculation, by default None bounds : tupple bounds of the particle, if equal no bounds, by default (0, 0) init_pos : array initial position of the particles if None random, by default None print_time : bool if True, print the optimisation time, by default True opt_type : str Neightboorhood type, by default "global" type possibly: - "global" : Global best PSO (star topology) - "local" : Local best PSO (ring topology) static : bool if False and opt_type is local, update the neigthboorhood of each particle every iterations bh_strategy : str out of bound position strategy for pyswarms optimizer [2]: - "nearest" : Round the value to the nearest bound (default) - "periodic" : set to the modulus of the value between the two bounds - "random" : set to a random value - "shrink" : reduce the velocity to finish land on the bound - "reflective" : mirror the position form inside to ouside the bounds - "intermediate" : set to intermediate value between previous pos and bound oh_strategy : dict (like {"w":str, "c1":str, "c2":str}) Dynamic options strategy for pyswarms optimizer [3], if None static options, by default None: - "exp_decay" : Decreases the parameter exponentially between limits - "lin_variation" : Decreases/increases the parameter linearly between limits - "nonlin_mod" : Decreases/increases the parameter between limits according to a nonlinear modulation index - "rand" : takes a uniform random value between limits ftol : float relative error in objective_func(best_pos) acceptable for convergence, if None -np.inf default None ftol : int number of iterations over which the relative error in objective_func(best_pos) is acceptable for convergence, by default 1 save_results : bool save or not the output in a .json file, by default False saving_file : str name of the file on wich the output should be saved, by default "pso_results.json" Returns ------- results : optim_results contains all the parameters and outputs of the PSO References ---------- links to pyswarms doc: [1] `Pyswams <https://pyswarms.readthedocs.io/en/latest/index.html>`_ [2] `Pyswams handlers <https://pyswarms.readthedocs.io/en/latest/api/pyswarms.handlers.html>`_ """
[docs] def __init__( self, n_particles=5, bounds=(0, 0), dimensions=50, options=None, maxiter=1, n_processes=None, init_pos=None, print_time=False, opt_type="global", static=False, ftol=None, ftol_iter=1, bh_strategy="nearest", oh_strategy=None, save_results=False, saving_file="pso_results.json", comment=None, ): """ Initialize a particle-swarm optimizer wrapper. """ super().__init__("PSO") self.swarm_optimizer = True self.n_particles = n_particles self.dimensions = dimensions self.options = options self.maxiter = maxiter self.n_processes = n_processes self.bounds = bounds self.init_pos = init_pos self.print_time = print_time self.opt_type = opt_type self.static = static self.ftol = ftol self.ftol_iter = ftol_iter self.bh_strategy = bh_strategy self.oh_strategy = oh_strategy self.save_results = save_results self.saving_file = saving_file self.comment = comment
def __nrv2pyswarms_bounds(self): """ Convert NRV-style bounds into the format expected by PySwarms. Returns ------- tuple[np.ndarray, np.ndarray] | None Lower and upper bounds arrays, or ``None`` when no bounds are applied. """ if np.size(self.bounds) == 2 * self.dimensions: if isinstance(self.bounds[0], tuple) and np.size(self.bounds[0]) == 2: max_bound = np.zeros(self.dimensions) min_bound = np.zeros(self.dimensions) for i, bd in enumerate(self.bounds): max_bound[i] = max(bd) min_bound[i] = min(bd) return (min_bound, max_bound) if np.size(self.bounds) > 2: return self.bounds elif self.bounds[0] == self.bounds[1]: return None else: max_bound = max(self.bounds) * np.ones(self.dimensions) min_bound = min(self.bounds) * np.ones(self.dimensions) return (min_bound, max_bound)
[docs] def minimize(self, f_swarm: callable, **kwargs) -> optim_results: """ Perform a Particle swarm optimization Parameters ---------- f_swarm : callable function taking in parameter a swarm (dim-dimensions array) and returning the cost for each particle (1-dimensionnal array) kwargs : dict containing parameters to change in the class (:class:``PSO_optimizer``) Returns ------- optim_results results of the optimization """ # self.__mproc_handling() results = super().minimize(f_swarm, **kwargs) verbose = parameters.get_nrv_verbosity() > 2 # initial position if not np.iterable(self.init_pos): self.init_pos = None # create pyswarms bounds psbounds = self.__nrv2pyswarms_bounds() if self.opt_type.lower() == "local": if not self.options or len(self.options) < 5: self.options = {"c1": 0.5, "c2": 0.5, "w": 0.5, "k": 5, "p": 2} topology = Ring(static=self.static) # Check number of pcu else: if not self.options: self.options = {"c1": 0.5, "c2": 0.5, "w": 0.5} topology = Star() # Initialize results directory if self.opt_type.lower() != "global": results["optimization_parameters"]["static"] = self.static if self.comment is not None: results["comment"] = self.comment if self.save_results: with open(self.saving_file, "w") as outfile: json.dump(results, outfile) if self.ftol is None: ## As np.inf cannot be encode by json encoding ftol is used as a local varialble ftol = -np.inf self.ftol_iter = 1 else: ftol = self.ftol try: # Call instance of PSO optimizer = ps.single.general_optimizer.GeneralOptimizerPSO( n_particles=self.n_particles, dimensions=self.dimensions, options=self.options, oh_strategy=self.oh_strategy, bounds=psbounds, ftol=ftol, ftol_iter=self.ftol_iter, init_pos=self.init_pos, bh_strategy=self.bh_strategy, topology=topology, ) optimizer.rep = set_log_level("WARNING", clear_log_file=True) __lab = "PSO optimizer" if self.n_processes is None or self.n_processes == 1: __lab += " - 1 proc" else: __lab += f" - {self.n_processes} procs" optimizer.name = __lab # Perform optimization t0 = perf_counter() cost, pos = optimizer.optimize( f_swarm, iters=self.maxiter, n_processes=None, verbose=verbose, ) t_opt = perf_counter() - t0 pos_history = np.array(optimizer.pos_history) self.nit = np.shape(pos_history[:, 0, :])[0] if self.print_time: print( "the cumputing time for ", self.nit, " iterations and ", self.n_particles, " particles is : ", t_opt, "s", ) # Save results results["nit"] = self.nit results["nfev"] = self.nit * self.n_particles results["status"] = "Completed" results["best_position"] = pos.tolist() results["x"] = results["best_position"] results["best_cost"] = cost results["optimization_time"] = t_opt results["cost_history"] = optimizer.cost_history results["neighbor_bestc"] = optimizer.mean_neighbor_history results["personal_bestc"] = optimizer.mean_pbest_history velocity_history = np.array(optimizer.velocity_history) pos_history = np.array(optimizer.pos_history) for i in range(self.n_particles): results["position" + str(i + 1)] = pos_history[:, i, :].tolist() results["velocity" + str(i + 1)] = velocity_history[:, i, :].tolist() if self.save_results: with open(self.saving_file, "w") as outfile: json.dump(results, outfile) except KeyboardInterrupt: results["status"] = "Interrupted" if self.save_results: with open(self.saving_file, "w") as outfile: json.dump(results, outfile) rise_error(KeyboardInterrupt, "Interuption raised during PSO") except: results["status"] = "Failed" results["Error_from_prompt"] = traceback.format_exc() print(results["Error_from_prompt"]) if self.save_results: with open(self.saving_file, "w") as outfile: json.dump(results, outfile) return results