import os
import sys
import time
from pathlib import Path
import numpy as np
import numpy.linalg as npl
import scipy.optimize as spo
from source_localization.gradient_descent import gradient_descent
from source_localization.proximal_gradient import proximal_gradient
[docs]
class Cost:
r"""
Class containing the cost function :math:`j`
to minimize with respect to the quantity of pheromone emitted by the insects :math:`s`, also refered to as the control variable.
The cost function is of the following form:
.. math::
j(s)=\|m\left(c(s)\right)-m_{obs}\|_{\mathbf{R}^{-1}}^2 + \sum_{i} \alpha_{reg,i} j_{reg,i}(s)
with :math:`\|m\left(c(s)\right)-m_{obs}\|_{\mathbf{R}^{-1}}^2` the term of discrepancy between the data :math:`m_{obs}`
and the estimate computed using the pheromone propagation model :math:`m\left(c(s)\right)`,
:math:`j_{reg,i}` some regularization terms based on biological information and
:math:`\alpha_{reg,i}` the associated weight coefficient.
This class contains several features to construct such cost function and to solve the optimization such as:
- the observation and regularization terms,
- methods to add, modify and remove regularization terms,
- methods to compute the gradient of each differentiable parts,
- methods to compute the proximal operator of each non-differentiable parts,
- a method to minimize the cost function.
Attributes
----------
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
obs: ~source_localization.obs.Obs
Object containing all the features related to the observations and estimation of the observed variables.
ctrl : ~source_localization.control.Control
Object containing the control variable :math:`s`, its value and all its features.
j_obs : float
The current value of the observations term of the cost function :math:`\|m\left(c(s)\right)-m_{obs}\|_{\mathbf{R}^{-1}}^2`.
implemented_regularization_types: list of str
The list of the keywords of the implemented type of regularization terms.
This list of keyword is `['Tikhonov', 'Population dynamic', 'LASSO', 'time group LASSO', 'logarithm barrier']`.
Let us note that the regularization terms `'Tikhonov'`, `'Population dynamic'` and `'logarithm barrier'` are differentiable,
while the regularization terms `'LASSO'` and `'time group LASSO'` are non-differentiable
with respect to the control variable :math:`s`.
regularization_types : list of str
The list of the keywords of the type of regularization terms considered.
If no regularization terms are considered, initialized to an empty list.
alpha : dict of float
The weight coefficients of the regularization terms :math:`\alpha_{reg,i}`.
This dictionnary links the keywords of the type of regularization considered to the associated weight coefficient.
If no regularization terms are considered, initialized to an empty dictionnary.
j_reg : dict of float
The current value of the regularization terms :math:`j_{reg,i}`.
This dictionnary links the keywords of the type of regularization considered to the value of the regularization term.
If no regularization terms are considered, initialized to an empty dictionnary.
"""
[docs]
def __init__(self, msh, obs, ctrl, regularization_types=None, alpha=None):
r"""
Constructor method
Parameters
----------
msh: ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
obs: ~source_localization.obs.Obs
Object containing all the features related to the observations and estimation of the observed variables.
ctrl : ~source_localization.control.Control
Object containing the control variable :math:`s` and all its features.
regularization_types: list of string, string or None, optional, default: None
The keyword or list of the keywords of the type of regularization terms considered.
If `None`, no regularization terms are considered.
alpha: dic of float, float or None, optional, default: None
The weight coefficient or dictionnary of weight coefficients :math:`\alpha_{reg,i}`.
If `None`, no regularization terms are considered.
Raises
------
ValueError
if the keywords of the type of regularization terms considered
do not coincide with the keywords of the dictionnary of weight coefficients
or are not in the list of the keywords of the implemented type of regularization terms.
"""
self.obs = obs
self.ctrl = ctrl
self.msh = msh
self.implemented_regularization_types = [
'Tikhonov',
'Population dynamic',
'LASSO',
'time group LASSO',
'logarithm barrier',
]
if regularization_types is None:
self.regularization_types = []
elif isinstance(regularization_types, str):
self.regularization_types = [regularization_types]
elif isinstance(regularization_types, list):
self.regularization_types = regularization_types
else:
raise ValueError(
"The given types of regularization term are neither None (no regularization term), nor a string or a list of string."
)
if not all(isinstance(regularization_type, str) for regularization_type in self.regularization_types):
raise ValueError("One of the given types of regularization term is not a string.")
for regularization_type in self.regularization_types:
if regularization_type not in self.implemented_regularization_types:
raise ValueError(
"The "
+ regularization_type
+ " regularization term has not been implemented."
+ " It should be on of these: "
+ ','.join(self.implemented_regularization_types)
)
if alpha is None:
self.alpha = {}
elif isinstance(alpha, float):
if len(self.regularization_types) != 1:
raise ValueError(
"The number of regularization terms in the list regularization_types is different "
+ "than the number of weight coefficients in the dictionnary alpha."
)
else:
self.alpha = {self.regularization_types[0]: alpha}
elif isinstance(alpha, dict):
self.alpha = alpha
else:
raise ValueError("The given weight coefficients are neither None (no regularization term), nor a float or a dictionnary.")
if not len(list(self.alpha.keys())) == len(self.regularization_types):
raise ValueError(
"The number of regularization terms in the list regularization_types is different "
+ "than the number of weight coefficients in the dictionnary alpha."
)
keys_to_remove = []
for key in self.alpha.keys():
if key not in self.regularization_types:
raise ValueError(
"The types of regularization term given in the list regularization_types are different"
+ " than the types of regularization term given as key of the dictionnary alpha."
+ " The "
+ key
+ "regularization term is found in the dictionnary alpha but not in the list regularization_types"
)
if not isinstance(self.alpha[key], float):
raise ValueError("The weight coefficient related to the " + key + " regularization term is not a float.")
if self.alpha[key] == 0.0:
keys_to_remove.append(key)
for key in keys_to_remove:
del self.alpha[key]
self.regularization_types.remove(key)
print(
"Warning: the regularization "
+ key
+ "has a weight coefficient equals to 0."
+ " Hence, this regularization term is not considered "
+ "and the key removed from the list regularization_types and the dictionary alpha."
)
self.j = 0
self.j_obs = 0
self.j_reg = {}
for regularization_type in self.regularization_types:
self.j_reg[regularization_type] = 0.0
[docs]
def add_reg(self, regularization_type, alpha):
r"""
Add a regularization term :math:`j_{reg,i}` to the cost function :math:`j`.
The addition of a regularization term is done in the following steps:
- update the attribute :attr:`regularization_types` by stacking the keyword of the type of regularization to add,
- update the attribute :attr:`alpha` by adding the additional keyword and the associated weight coefficient,
- update the attribute :attr:`j_reg` by adding the additional keyword and initialize the associated value with `0`.
Parameters
----------
regularization_type: string
The keyword of the type of regularization to add.
alpha: float
The associated weight coefficient :math:`\alpha_{reg,i}`.
Raises
------
ValueError
if the keyword of the type of regularization terms to additionnaly considered
is not in the list of the keywords of the implemented type of regularization terms.
Notes
-----
If the weight coefficient of the additional regularization term is `0` (:math:`\alpha_{reg,i}=0`),
then the regularization term is not added and nothing is done.
If the regularization term was already considered,
then the method only updates the associated weight coefficient :math:`\alpha_{reg,i}`.
"""
# To do:
# add exceptions to make sure the input has a valid type
# possibility to give a list of regularizations to add?
# add case where regularization_type is already in regularizations_type`
if regularization_type not in self.implemented_regularization_types:
raise ValueError(
"The "
+ regularization_type
+ " regularization term has not been implemented."
+ " It should be on of these: "
+ ','.join(self.implemented_regularization_types)
)
if alpha == 0:
print(
"Warning: the regularization term is to be added with a weight coefficient set to 0."
+ " Therefore, this regularization term is not considered and not added."
)
else:
if regularization_type in self.regularization_types:
print("Warning: the regularization term is already considered. The associated weight coefficient is still updated")
self.modify_weight_reg(regularization_type, alpha)
else:
self.regularization_types.append(regularization_type)
self.j_reg[regularization_type] = 0.0
self.alpha[regularization_type] = alpha
[docs]
def modify_weight_reg(self, regularization_type, alpha):
r"""
Modify the weight coefficient :math:`\alpha_{reg,i}` for a given type of regularization term :math:`j_{reg,i}`.
Parameters
----------
regularization_type: string
The keyword of the type of regularization whose weigth coefficient needs to be changed.
alpha: float
The new value of the weight coefficient :math:`\alpha_{reg,i}`.
Raises
------
ValueError
if the keyword of the type of regularization terms whose weigth coefficient needs to be changed
is not in the list of the keywords of the implemented type of regularization terms.
ValueError
if the keyword of the type of regularization terms whose weigth coefficient needs to be changed
is not in the list of the keywords of the type of regularization terms considered.
Notes
-----
If the new value of the weight coefficient is `0` (:math:`\alpha_{reg,i}=0`), then the regularization term is removed.
"""
# To do:
# add exceptions to make sure the input has a valid type
# possibility to give a list of regularizations to modify?
if regularization_type not in self.implemented_regularization_types:
raise ValueError(
"The "
+ regularization_type
+ " regularization term has not been implemented."
+ " It should be on of these: "
+ ','.join(self.implemented_regularization_types)
)
if regularization_type not in self.regularization_types:
raise ValueError(
"The "
+ regularization_type
+ " regularization term is not considered up to now."
+ " These one are considered and therefore can be modified: "
+ ','.join(self.regularization_types)
)
self.alpha[regularization_type] = alpha
if alpha == 0:
print(
"Warning: the weight coefficient associated to the regularization "
+ regularization_type
+ " is set to 0."
+ " Hence, this regularization term is not considered "
+ "and the key removed from the list regularization_types and the dictionary alpha."
)
self.remove_reg(regularization_type)
[docs]
def remove_reg(self, regularization_type):
r"""
Remove a regularization term :math:`j_{reg,i}` from the cost function :math:`j`.
The addition of a regularization term is done in the following steps:
- update the attribute :attr:`regularization_types` by removing the keyword of the type of regularization to remove,
- update the attribute :attr:`alpha` by deleting the keyword and the associated weight coefficient,
- update the attribute :attr:`j_reg` by deleting the keyword and the associated value of :math:`j_{reg,i}`.
Parameters
----------
regularization_type: string
The keyword of the type of regularization to remove.
Raises
------
ValueError
if the keyword of the type of regularization terms to remove is
not in the list of the keywords of the implemented type of regularization terms.
Notes
-----
If the regularization term to remove was not considered, then the method does nothing.
"""
# To do:
# add exceptions to make sure the input has a valid type
# possibility to give a list of regularizations to modify?
if regularization_type not in self.implemented_regularization_types:
raise ValueError(
"The "
+ regularization_type
+ " regularization term has not been implemented."
+ " It should be on of these: "
+ ','.join(self.implemented_regularization_types)
)
if regularization_type not in self.regularization_types:
print("Warning: the regularization " + regularization_type + "is not considered up to now. Thus, there is nothing to remove")
else:
self.regularization_types.remove(regularization_type)
del self.j_reg[regularization_type]
del self.alpha[regularization_type]
[docs]
def reg_cost(self):
r"""
Compute the regularization terms :math:`j_{reg,i}` of the cost function
and update the attribute :attr:`j_reg` accordingly.
Raises
------
ValueError
if a regularization term is considered but can not be computed due to parameters with incorrect or missing parameters.
"""
if 'Tikhonov' in self.regularization_types:
self.j_reg['Tikhonov'] = (
self.alpha['Tikhonov']
* self.msh.mass_cell
* self.msh.dt
* (self.ctrl.value - self.ctrl.background_value).dot(self.ctrl.C_inv.dot(self.ctrl.value - self.ctrl.background_value))
)
if 'Population dynamic' in self.regularization_types:
if self.ctrl.population_dynamic_model is None:
raise ValueError(
"The population dynamic model has not been specified and is None by default."
+ "Thus the population dynamic-informed regularization term cannot be computed."
)
self.j_reg['Population dynamic'] = (
self.alpha['Population dynamic']
* self.msh.mass_cell
* self.msh.dt
* npl.norm(self.ctrl.population_dynamic_model.matvec(self.ctrl.value)) ** 2
)
if 'LASSO' in self.regularization_types:
self.j_reg['LASSO'] = self.alpha['LASSO'] * self.msh.mass_cell * self.msh.dt * npl.norm(self.ctrl.value, 1)
if 'time group LASSO' in self.regularization_types:
self.j_reg['time group LASSO'] = (
self.alpha['time group LASSO']
* self.msh.mass_cell
* np.sum(
np.array(
[
npl.norm(self.ctrl.value.reshape((self.msh.t_array.size, self.msh.y.size * self.msh.x.size))[:, i])
* np.sqrt(self.msh.dt)
for i in range(self.msh.y.size * self.msh.x.size)
]
)
)
)
if 'logarithm barrier' in self.regularization_types:
self.j_reg['logarithm barrier'] = (
self.alpha['logarithm barrier']
* self.msh.mass_cell
* self.msh.dt
* np.sum(
-self.ctrl.log_barrier_threshold
* np.log(self.ctrl.log_barrier_threshold - self.ctrl.value[self.ctrl.exclusion_domain] ** 2)
)
)
[docs]
def obs_cost(self):
r"""
Compute the observation term :math:`\|m\left(c(s)\right)-m_{obs}\|_{\mathbf{R}^{-1}}^2` of the cost function
and update the attribute :attr:`j_obs` accordingly.
Raises
------
ValueError
if the observed variable has not been estimated.
"""
if np.array([np.isnan(self.obs.d_est[i]) for i in range(self.obs.N_obs)]).any():
raise ValueError(
"The vector of estimation of the observed variable has not been computed."
+ " The method observation operator should be used to computed this vector."
)
self.j_obs = (
self.msh.mass_cell * self.msh.dt * (self.obs.d_est - self.obs.d_obs).dot(self.obs.R_inv.dot(self.obs.d_est - self.obs.d_obs))
)
[docs]
def objectif(self):
r"""
Evaluate the cost objectif function
.. math::
J(s,m) =\|m-m_{obs}\|_{\mathbf{R}^{-1}}^2 + \sum_{i} \alpha_{reg,i} j_{reg,i}(s)
for the current value of the control :math:`s` and of the observed variable :math:`m`
Call the methods :meth:`obs_cost` and :meth:`reg_cost`
and update the attribute :attr:`j` accordingly.
"""
self.obs_cost()
self.reg_cost()
self.j = self.j_obs
for regularization_type in self.regularization_types:
self.j += self.j_reg[regularization_type]
[docs]
def gradient_objectif_wrt_d(self):
r"""
Compute the gradient of the objectif function
:math:`J(s,m) =\|m-m_{obs}\|_{\mathbf{R}^{-1}}^2 + \sum_{i} \alpha_{reg,i} j_{reg,i}(s)`
with respect to the observed variable :math:`m`
for the current estimate of the observed variable:
.. math::
\nabla_m J(s,m) =2 R^{-1}(m-m_{obs})
Returns
-------
~numpy.ndarray
evaluation of the gradient :math:`\nabla_m J(s,m)`
Raises
------
ValueError
if the observed variable has not been estimated.
"""
if np.array([np.isnan(self.obs.d_est[i]) for i in range(self.obs.N_obs)]).any():
raise ValueError(
"The vector of estimation of the observed variable has not been computed."
+ " The method observation operator should be used to computed this vector."
)
return 2 * self.obs.R_inv.dot(self.obs.d_est - self.obs.d_obs)
[docs]
def gradient_objectif_wrt_S(self):
r"""
Compute the gradient of the differentiable part objectif function
:math:`J(s,m) =\|m-m_{obs}\|_{\mathbf{R}^{-1}}^2 + \sum_{i} \alpha_{reg,i} j_{reg,i}(s)`
with respect to the control variable :math:`s`
for the current value of the control variable:
.. math::
\nabla_s J(s,m) = \sum_{i} \alpha_{reg,i} \nabla j_{reg,i}(s)
Returns
-------
~numpy.ndarray
evaluation of the gradient :math:`\nabla_s J(s,m)`
Raises
------
ValueError
if a regularization term is considered but can not be computed due to parameters with incorrect or missing parameters.
"""
grad = np.zeros(
self.msh.t_array.size * self.msh.x.size * self.msh.y.size,
)
if 'Tikhonov' in self.regularization_types:
grad += self.alpha['Tikhonov'] * 2 * self.ctrl.C_inv.dot(self.ctrl.value - self.ctrl.background_value)
if 'Population dynamic' in self.regularization_types:
if self.ctrl.population_dynamic_model is None:
raise ValueError(
"The population dynamic model has not been specified and is None by default."
+ "Thus the gradient of the population dynamic-informed regularization term cannot be computed."
)
grad += (
self.alpha['Population dynamic']
* 2
* self.ctrl.population_dynamic_model.rmatvec(self.ctrl.population_dynamic_model.matvec(self.ctrl.value))
)
if 'logarithm barrier' in self.regularization_types:
grad += (
self.alpha['logarithm barrier']
* 2
* self.ctrl.log_barrier_threshold
* np.where(self.ctrl.exclusion_domain, 1, 0)
/ (self.ctrl.log_barrier_threshold - self.ctrl.value**2)
)
return grad
[docs]
def reg_proximal_operator(self, s, lam):
r"""
Evaluate the proximal operator :math:`prox_\lambda`
of the non-differentiable part objectif function
:math:`J(s,m) =\|m-m_{obs}\|_{\mathbf{R}^{-1}}^2 + \sum_{i} \alpha_{reg,i} j_{reg,i}(s)`
for a given value of the control variable :math:`s` and
a given parameter of the proximal operator :math:`\lambda`.
Parameters
----------
s: ~numpy.ndarray
The value of the control variable :math:`s` in which the proximal operator is evaluated.
lam: float
The parameter of the proximal operator :math:`\lambda`.
Returns
-------
prox: ~numpy.ndarray
The evaluation of the proximal operator raveled into a vector.
Notes
-----
For now, only one non-differentiable regularization term can be considered at a time.
Raises
------
ValueError
if no regularization terms are non-differentiable.
"""
# To do:
# add exceptions to check the size and type of inputs
if 'LASSO' in self.regularization_types:
prox = np.array(
[
np.multiply(np.sign(s[j]), np.max([abs(s[j]) - lam * self.alpha['LASSO'] * self.msh.mass_cell * self.msh.dt, 0]))
for j in range(s.size)
]
)
# prox = np.zeros((s.size,))
# for j in range(s.size):
# prox[j] = np.multiply(np.sign(s[j]), np.max([abs(s[j]) - lam * self.alpha['LASSO'] * self.msh.mass_cell * self.msh.dt, 0]))
elif 'time group LASSO' in self.regularization_types:
s_reshape = s.reshape((self.msh.t_array.size, self.msh.y.size * self.msh.x.size))
prox = np.array(
[
s_reshape[:, i]
* np.maximum(
1 - lam * self.alpha['time group LASSO'] * self.msh.mass_cell / (npl.norm(s_reshape[:, i]) * np.sqrt(self.msh.dt)),
0,
)
for i in range(self.msh.y.size * self.msh.x.size)
]
).T.reshape((self.msh.t_array.size * self.msh.y.size * self.msh.x.size,))
else:
raise ValueError(
"The proximal operator is not implemented for any of these regularization terms."
+ " If all the regularization terms are differentiable, there is no need of the proximal operator,"
+ " classical gradient descent algorithm should be used instead."
)
return prox
[docs]
def cost_and_gradient(self, s, direct_model, adjoint_model, display=False):
r"""
Compute the cost function :math:`j` and its gradient :math:`\nabla j`
for a given value of the control variable :math:`s`.
Parameters
----------
s: ~numpy.ndarray
The value of the control variable :math:`s` in which :math:`j` and :math:`\nabla j` are evaluated.
direct_model: ~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation
The pheromone propagation model that enables to estimate the observed variable :math:`m\left(c(s)\right)`.
adjoint_model: ~source_localization.adjoint_convection_diffusion_2D.AdjointDiffusionConvectionReaction2DEquation
The adjoint model that enables to compute the adjoint state :math:`c^*(s)`.
display: boolean, optional, default: False
If True, print the progress of the computations throught the different steps
Returns
-------
j: float
The evaluation of the cost function :math:`j(s)`.
grad_j: ~numpy.ndarray
The evaluation of the gradient of the cost function :math:`\nabla j(s)=\sum_{i} \alpha_{reg,i} \nabla j_{reg,i}(s)-c^*(s)`.
"""
# set the value of the control to the one provided as input
self.ctrl.value = np.copy(s) # NECESSARY???
# set the value of the source term of the direct model to the value of the control
self.ctrl.apply_control(direct_model)
# solve the direct model with this value of the source term
# and store the estimate of the state variable needed to compute the estimate of the observed variable
if display:
sys.stdout.write('\rrunning the direct model\n')
sys.stdout.flush()
direct_model.solver_est_at_obs_times(self.obs, display_flag=False)
# compute the estimate of the observed variable
if display:
sys.stdout.write('\rrunning the observation operator\n')
sys.stdout.flush()
self.obs.obs_operator()
# compute the adjoint state
if display:
sys.stdout.write('\rrunning the adjoint model\n')
sys.stdout.flush()
p = adjoint_model.solver(self, display_flag=False)
# compute the cost function for the current value of the control
self.objectif()
# compute the gradient of the cost function for the current value of the control
grad_j = self.gradient_objectif_wrt_S() - p
return self.j, grad_j
[docs]
def minimize(
self,
direct_model,
adjoint_model,
method,
j_obs_threshold=-1,
s_init=None,
options={},
path_save=None,
restart_flag=False,
):
r"""
Minimize the cost function :math:`j`
Parameters
----------
direct_model : ~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation
The pheromone propagation used to compute the estimate of the observation variable :math:`m\left(c(s)\right)`.
adjoint_model : ~source_localization.adjoint_convection_diffusion_2D.AdjointDiffusionConvectionReaction2DEquation
The adjoint model associated to the pheromone propagation model and used to compute the gradient :math:`\nabla j`.
method : str
keyword of the method used to minimize the cost function.
Can be either `'gradient descent'`, `'proximal gradient'`, or `'L-BFGS-B'`.
- With the `'gradient descent'` keyword,
the method uses the algorithm implemented
in the module :mod:`~source_localization.gradient_descent`.
This method should be used only when :math:`j` is differentiable.
- With `'proximal gradient'` keyword,
the method uses the algorithm implemented
in the module :mod:`~source_localization.proximal_gradient`.
This method should be used only when :math:`j` is non-differentiable.
- With the `'L-BFGS-B'` keyword,
the method uses the L-BFGS-B algorithm implemented
in the function :func:`~scipy.optimize.minimize`
of the module :mod:`~scipy.optimize`.
This method should be used only when :math:`j` is differentiable.
j_obs_threshold : float, optional, default: -1
Threshold :math:`\epsilon` such that the algorithm terminates successfully
when :math:`\|m-m_{obs}\|_{\mathbf{R}^{-1}}^2<\epsilon`.
If has a negative value, this termination criteria is not considered
as :math:`\|m-m_{obs}\|_{\mathbf{R}^{-1}}^2` is always positive.
s_init : ~numpy.ndarray, optional, default: None
Value for the control :math:`s_0` used as starting point of the iterative optimization algorithm.
If no initial value are provided, the optimization algorithm starts with :math:`s_0=0`.
options : dict, optional, default: {}
Dictionary containing several options to customize the optimization method.
path_save : str or None, optional, default: None
Path to the folder in which the results are saved. If `None`, the results are not saved.
restart_flag : bool, optional, default: False
If `True`, then the optimization algorithm is restarted (hot start)
using the results found in the folder which path is given by `path_save`.
Returns
-------
direct_model : ~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation
The pheromone propagation model with the optimal source term.
j_obs : ~numpy.ndarray
The values of the observation term :math:`\|m\left(c(s)\right)-m_{obs}\|_{\mathbf{R}^{-1}}^2`
at each iterations of the optimization process.
j_reg : dic of ~numpy.ndarray
The dictionnary containing the values of the regularization terms :math:`j_{reg,i}`
at each iterations of the optimization process.
s_a : ~numpy.ndarray
The optimal value of the control.
Raises
------
ValueError
If the directory corresponding to the given path and needed to restart the optimization process does not exist.
ValueError
If one of the files needed to restart the optimization process does not exist in the given directory.
ValueError
If `method` is `'L-BFGS-B'` or `'gradient descent'`
and the cost function has a LASSO or group-LASSO regularization term,
making the cost function non-differentiable.
ValueError
If `method` is `'proximal gradient'` and
no LASSO or group LASSO regularization terms are present,
making the cost function differentiable.
ValueError
If `method` is not the keyword of one of the implemented methods
(`'gradient descent'`, `'proximal gradient'`, `'L-BFGS-B'`).
"""
# To do:
# ensure that option is a dic
# Simplify and clarify the initialization and restart process.
# Simplify and clarify the save_output method.
# improve callback function
# set the iteration number, the outputs vector and the initial cost as global variable so they can be used in the callback function
global iteration, j_obs, j_reg, j_0, t_i, traj_integrated_s, sum_step_size
if "step size" not in options.keys():
options["step size"] = 1.0
# initialize the iteration number and the outputs vector
# if restart_flag is False, initialize with empty object or 0
if not restart_flag:
iteration, nb_iteration_init, direct_model, j_obs, j_reg, j_0, traj_integrated_s, sum_step_size = self.cold_start(
s_init, direct_model, path_save
)
# if restart_flag is True, then load the results from the previous optimization
else:
iteration, nb_iteration_init, direct_model, j_obs, j_reg, j_0, traj_integrated_s, sum_step_size = self.hot_start(
direct_model, path_save
)
# define the callback function, update the iteration number and the outputs vector, and display the results at the current iteration
def callback_fct(S):
"""define the callback function, update the iteration number and the outputs vector,
and display the results at the current iteration"""
global iteration, t_i, traj_integrated_s, sum_step_size
j_obs.append(self.j_obs)
iteration += 1
for regularization_type in self.regularization_types:
j_reg[regularization_type].append(self.j_reg[regularization_type])
if self.j_obs < j_obs_threshold:
print('--- The cost of the observations is below the threshold. Thus, the optimization algorithm is terminated ---')
self.save_output(
iteration, j_0, path_save, traj_integrated_s, sum_step_size, j_obs, j_reg, restart_flag, nb_iteration_init, options
)
sys.exit()
sys.stdout.write(
f'- \riteration {iteration}: j = {"{:.6e}".format(self.j)}'
+ f', j_obs = {"{:.6e}".format(self.j_obs)}'
+ f', ite comp time = {"{:.3e}".format(time.time()-t_i)}s -\n'
)
sys.stdout.flush()
traj_integrated_s += options["step size"] * S
sum_step_size += options["step size"]
t_i = time.time()
t_i = time.time()
# minimization using the L-BFGS-B method
# this method is not efficient for now
if method == 'L-BFGS-B':
if 'LASSO' in self.regularization_types:
raise ValueError(
"The cost function is not differentiable du to a LASSO regularization term"
"Hence, the L-BFGS-B algorithm can not be used." + " The proximal-gradient algorithm should be used instead."
)
sys.stdout.write('Optimizing using the L-BFGS-B algorithm \n')
sys.stdout.flush()
bounds = np.zeros((np.size(self.ctrl.value), 2))
bounds[:, 1] = +np.inf
res = spo.minimize(
self.cost_and_gradient,
self.ctrl.value,
args=(direct_model, adjoint_model),
jac=True,
method='L-BFGS-B',
options=options,
callback=callback_fct,
bounds=bounds,
)
s_a = np.copy(res.x)
# minimization using the gradient descent method
elif method == "gradient descent":
if 'LASSO' in self.regularization_types or 'time group LASSO' in self.regularization_types:
raise ValueError(
"The cost function is not differentiable du to a LASSO regularization term"
"Hence, the gradient descent algorithm can not be used." + " The proximal-gradient algorithm should be used instead."
)
sys.stdout.write('Optimizing using the gradient descent algorithm \n')
sys.stdout.flush()
s_a, _, _, _ = gradient_descent(
self.cost_and_gradient,
self.ctrl.value,
args=(direct_model, adjoint_model),
options=options,
callback=callback_fct,
)
# minimization using the gradient descent method
elif method == "proximal gradient":
if 'LASSO' not in self.regularization_types and 'time group LASSO' not in self.regularization_types:
raise ValueError(
"The proximal operator is not implemented for any of these regularization terms"
+ " or the weight coefficient of the LASSO regularization is 0."
+ " If all the regularization terms are differentiable, the proximal gradient method is not appropriate."
+ " Classical gradient descent algorithm should be used instead."
)
sys.stdout.write('Optimizing using the proximal gradient algorithm \n')
sys.stdout.flush()
s_a, _, _, _ = proximal_gradient(
self.ctrl.value,
self.cost_and_gradient,
self.reg_proximal_operator,
args=(direct_model, adjoint_model),
options=options,
callback=callback_fct,
)
else:
raise ValueError("The given type of method has not been implemented.")
# update the value of the control with its optimal value and apply it as souce term of the direct model
self.ctrl.value = s_a
self.ctrl.apply_control(direct_model)
self.save_output(
iteration, j_0, path_save, traj_integrated_s, sum_step_size, j_obs, j_reg, restart_flag, nb_iteration_init, options
)
return direct_model, j_obs, j_reg, s_a
def cold_start(self, s_init, direct_model, path_save):
# A short function to initialize all the variables required for the minimize method
# in case the optimization is started from scratch
# this function exists for sake of readability of the minimize method
sys.stdout.write('initialization of the optimization process, computing the initial cost\n')
sys.stdout.flush()
# if no initial value of the control is given
# then starts at 0
if s_init is None:
self.ctrl.value = np.zeros(self.ctrl.value.shape)
else:
self.ctrl.value = s_init
self.ctrl.apply_control(direct_model)
direct_model.solver_est_at_obs_times(self.obs, display_flag=False)
self.obs.obs_operator()
self.objectif()
j_0 = self.j
iteration = 0
nb_iteration_init = 0
j_obs = []
j_reg = {}
traj_integrated_s = np.zeros_like(self.ctrl.value)
sum_step_size = 0
for regularization_type in self.regularization_types:
j_reg[regularization_type] = []
if path_save is not None:
# if the save directory does not exist, then it is created
if not os.path.isdir(path_save):
os.makedirs(path_save)
return iteration, nb_iteration_init, direct_model, j_obs, j_reg, j_0, traj_integrated_s, sum_step_size
def hot_start(self, direct_model, path_save):
# A short function to initialize all the variables required for the minimize method
# in case the optimization is started from the results of the previous optimization
# this function exists for sake of readability of the minimize method
if not os.path.isdir(path_save):
raise ValueError("The directory corresponding to the given path does not exist.")
if not ((Path(path_save) / 'j_obs_vs_ite.npy').exists()):
raise ValueError("One of the files needed to restart the optimization processe does not exist in the the given directory.")
sys.stdout.write('restarting the optimization process from the data contained in the dir ' + str(path_save) + '\n')
sys.stdout.flush()
j_obs = np.load(Path(path_save) / 'j_obs_vs_ite.npy').tolist()
iteration = len(j_obs)
nb_iteration_init = len(j_obs)
j_reg = {}
j_0 = j_obs[0]
for regularization_type in self.regularization_types:
name = 'j_reg_' + regularization_type.replace(" ", "_") + '_vs_ite.npy'
if not (Path(path_save) / ('j_reg_' + regularization_type.replace(" ", "_") + '_vs_ite.npy')).exists():
j_reg[regularization_type] = np.zeros((len(j_obs),)).tolist()
print(
"Warning: the regularization "
+ regularization_type
+ " term has not been found in the given directory."
+ " It is assumed that this regularization term was not used previously and thus is set to 0."
)
else:
j_reg[regularization_type] = np.load(Path(path_save) / name).tolist()
if len(j_reg[regularization_type]) != iteration:
j_reg[regularization_type] += [0 for i in range(iteration - len(j_reg[regularization_type]))]
print(
"Warning: the regularization "
+ regularization_type
+ " term was not computed at every iterations."
+ " It is assumed that this regularization term was not used previously and thus is completed with 0."
)
j_0 += j_reg[regularization_type][0]
if not (Path(path_save) / ('traj_integrated_s.npy')).exists():
traj_integrated_s = np.zeros_like(self.ctrl.value)
print(
"Warning: trajectory integrated source term was not computed at every iterations."
+ " It is set to 0 and previous iteration should e added later."
)
else:
traj_integrated_s = np.load(Path(path_save) / 'traj_integrated_s.npy')
if not (Path(path_save) / ('sum_step_size.npy')).exists():
sum_step_size = 0
print(
"Warning: the sum of the step size was not computed at every iterations."
+ " It is set to 0 and previous iteration should e added later."
)
else:
sum_step_size = np.load(Path(path_save) / 'sum_step_size.npy')
self.ctrl.value = np.load(Path(path_save) / 'S_optim.npy')
self.ctrl.apply_control(direct_model)
# compute the initial cost
direct_model.solver_est_at_obs_times(self.obs, display_flag=False)
self.obs.obs_operator()
self.objectif()
return iteration, nb_iteration_init, direct_model, j_obs, j_reg, j_0, traj_integrated_s, sum_step_size
def save_output(
self, iteration, j_0, path_save, traj_integrated_s, sum_step_size, j_obs, j_reg, restart_flag, nb_iteration_init, options
):
# A short function to display the results at the final iteration, the exit mesage of the optimization algorithm
# and save the outpur
# this function exists for sake of readability of the minimize method
sys.stdout.write('\nfinal iteration:\n')
sys.stdout.write(f'number of iteration = {iteration}\n')
sys.stdout.write(f'j = {self.j/j_0} % of the initial cost\n')
sys.stdout.write(f'j_obs = {self.j_obs/j_0} % of the initial cost\n')
for key in self.j_reg.keys():
sys.stdout.write(f'j_{key} = {self.j_reg[key]/j_0} % of the initial cost\n')
sys.stdout.flush()
if path_save is not None:
# if the save directory does not exist, then it is created
if not os.path.isdir(path_save):
os.makedirs(path_save)
# Save the optimal value of the control two times
# to keep records in case of multiple restarts
np.save(Path(path_save) / 'S_optim', self.ctrl.value)
np.save(Path(path_save) / f'S_optim_ite_{"{0:04d}".format(iteration)}', self.ctrl.value)
np.save(Path(path_save) / 'traj_integrated_s.npy', traj_integrated_s)
np.save(Path(path_save) / f'traj_integrated_s_ite_{"{0:04d}".format(iteration)}.npy', traj_integrated_s)
np.save(Path(path_save) / 'sum_step_size.npy', sum_step_size)
np.save(Path(path_save) / f'sum_step_size_ite_{"{0:04d}".format(iteration)}.npy', sum_step_size)
np.save(Path(path_save) / 'j_obs_vs_ite.npy', j_obs)
for regularization_type in self.regularization_types:
name_alpha = 'alpha_' + regularization_type.replace(" ", "_") + '.npy'
name_j_reg = 'j_reg_' + regularization_type.replace(" ", "_") + '_vs_ite.npy'
np.save(Path(path_save) / name_alpha, self.alpha[regularization_type])
np.save(Path(path_save) / name_j_reg, j_reg[regularization_type])
if "step size" in options.keys():
if not restart_flag:
step_size_array = []
else:
if (Path(path_save) / 'step_size_array.npy').exists():
step_size_array = np.load(Path(path_save) / 'step_size_array.npy').tolist()
else:
step_size_array = np.zeros((nb_iteration_init,)).tolist()
step_size_array += [options["step size"] for i in range(iteration - nb_iteration_init)]
np.save(Path(path_save) / 'step_size_array.npy', step_size_array)