Source code for nrv.optim._Problems
"""
NRV-:class:`.Problem` handling.
"""
import numpy as np
import faulthandler
import traceback
from rich.progress import (
Progress,
SpinnerColumn,
BarColumn,
TextColumn,
TimeRemainingColumn,
)
from ..backend._parameters import parameters
from ..backend._NRV_Class import NRV_class
from ..backend._NRV_Mproc import get_pool
from ..backend._log_interface import rise_error, pass_debug_info, set_log_level
from .optim_utils._OptimResults import optim_results
from ._CostFunctions import cost_function
from ._Optimizers import Optimizer
import sys
# enable faulthandler to ease 'segmentation faults' debug
faulthandler.enable()
[docs]
class Problem(NRV_class):
"""
Problem Class
A class to describe problems that should be optimized with the NRV Framework.
The problem should be described with a simulation and a cost, using the object
cost_function, and various optimization algorithms can be used to find optimal solution.
This class is abstract and is not supposed to be used directly by the end user. NRV can
handle two types of problems:
- problems where a geometric parameter can be optimized: please refer to ...
- problems where the waveform can be optimized: please refer to ...
"""
[docs]
def __init__(
self,
cost_function: cost_function = None,
optimizer: Optimizer = None,
save_problem_results: bool = False,
problem_fname: str = "optim.json",
n_proc: int = None,
):
"""
Initialize an optimization problem coupling a cost function and an optimizer.
Parameters
----------
cost_function : cost_function | None, optional
Cost function to minimize.
optimizer : Optimizer | None, optional
Optimization backend.
save_problem_results : bool, optional
If ``True``, save optimization results after completion.
problem_fname : str, optional
Output filename used when saving problem results.
n_proc : int | None, optional
Number of worker processes used for multiprocessing-enabled paths.
"""
super().__init__()
self._CostFunction = cost_function
self._Optimizer = optimizer
# For cases where optimisation is done on a swarm(groupe) of particle
self.swarm_optimizer = False
# self._SwarmCostFunction = None
self.save_problem_results = save_problem_results
self.problem_fname = problem_fname
self.mp_type = None
self.n_proc = n_proc or parameters.optim_Ncores
# Handling the cost_function attribute
@property
def costfunction(self) -> cost_function:
"""
Cost function of a Problem,
the cost function should be a CosFunction object, it should return a scalar.
NRV function should be prefered
"""
return self._CostFunction
@costfunction.setter
def costfunction(self, cf: cost_function):
"""
Set the cost function used by the optimization problem.
Parameters
----------
cf : cost_function
Cost function to assign.
"""
# need to add a verification that the cost function is a scallar and so on
self._CostFunction = cf
@costfunction.deleter
def costfunction(self):
"""
Remove the current cost function.
"""
self._CostFunction = None
def _SwarmCostFunction(self, swarm):
"""
Evaluate the cost of every particle in a swarm.
Parameters
----------
swarm : np.ndarray
Swarm array of shape ``(n_particles, n_dimensions)``.
Returns
-------
np.ndarray
Cost value for each particle.
"""
s_l = len(swarm)
costs = np.zeros((s_l))
if self.mp_type == "costfunction":
for i in range(s_l):
particle = swarm[i][:]
costs[i] = self._CostFunction(particle)
else:
# LR: This still generate PETSC errors (not crashing the script tho). Adding pool.close()/pool.join() crashes everything however
with get_pool(n_jobs=self.n_proc) as pool:
for i_c, cost in enumerate(pool.imap(self._CostFunction, swarm)):
costs[i_c] = cost
return costs
[docs]
def compute_cost(self, X):
"""
Evaluate the scalar cost at one position vector.
Parameters
----------
X : np.ndarray
Candidate parameter vector.
Returns
-------
float
Cost value.
"""
return self._CostFunction(X)
# Handling the Optimizer attribute
@property
def optimizer(self):
"""
Optimizer of the problem,
the Optimizer should be an Optimizer object. It has reference to optimization
methods and constraints
"""
return self._Optimizer
@optimizer.setter
def optimizer(self, optim: Optimizer):
"""
Set the optimizer backend used by the problem.
Parameters
----------
optim : Optimizer
Optimizer instance to assign.
"""
self._Optimizer = optim
self.swarm_optimizer = self._Optimizer.swarm_optimizer
@optimizer.deleter
def optmizer(self):
"""
Placeholder deleter for the optimizer property.
"""
# self._Optimizer = None
pass
# Call method is where the magic happens
def __call__(self, **kwargs) -> optim_results:
"""
Perform the optimization: minimze the `cost_function` using `optmizer`
Parameters
----------
kwargs
containing parameters of the optimizer to change
Returns
-------
optim_results
results of the optimization
Raises
------
KeyboardInterrupt
"""
try:
kwargs = self.__update_saving_parameters(**kwargs)
if self.mp_type is None:
self.set_multiprocess_type()
if not self.swarm_optimizer:
results = self._Optimizer(self._CostFunction, **kwargs)
else:
results = self._Optimizer(self._SwarmCostFunction, **kwargs)
results["status"] = "Completed"
except KeyboardInterrupt:
raise KeyboardInterrupt
except:
results["status"] = "Error"
rise_error(traceback.format_exc())
set_log_level("INFO")
if self.save_problem_results:
results.save(save=True, fname=self.problem_fname)
return results
# Mcore handling
def __check_m_proc_CostFunction(self):
"""
check if a cost funciton can be parallelized
"""
if isinstance(self._CostFunction, cost_function):
return self._CostFunction.is_m_proc_func
else:
return False
[docs]
def set_multiprocess_type(self, costfunction_mp=True, n_core=None):
"""
Set if multiprocessing should be applied to the optimizaiton or the CostFunction simulation
Warning
-------
For now, only costfunction can be parallelized. This will be improve in the future
"""
if n_core is not None:
self.n_proc = n_core
# parallelizable optimizer
if "n_processes" in self._Optimizer.__dict__:
if self.__check_m_proc_CostFunction() and costfunction_mp:
# * To add number of n_core_fascicle = n_core
self._Optimizer.n_processes = None
self.mp_type = "costfunction"
else:
# * To add number of n_core_fascicle = 1
#!! Bug cannot compute local method generated from cost_function_swarm_from_particle
self._Optimizer.n_processes = self.n_proc
#!!self._Optimizer.n_processes = None
self.mp_type = "optimizer"
else:
# * To add number of n_core_fascicle = n_core
self.mp_type = "costfunction"
# additional methods
def __update_saving_parameters(self, **kwargs):
"""
internal use only: update the results saving parameters
and remove the corresponding keys from kwargs
"""
if "save_problem_results" in kwargs:
self.save_problem_results = kwargs.pop("save_problem_results")
if "problem_fname" in kwargs:
self.problem_fname = kwargs.pop("problem_fname")
return kwargs
[docs]
def context_and_cost(self, context_func, cost_func, residual):
"""
Convenience helper to build and assign a :class:`cost_function`.
Parameters
----------
context_func : callable
Context-modifier callable.
cost_func : callable
Cost-evaluation callable.
residual : Any
Static simulation context.
"""
self.cost_function = cost_function(context_func, cost_func, residual)
[docs]
def autoset_optimizer(self):
"""
Placeholder for automatic optimizer selection.
"""
pass