import os
import sys
from pathlib import Path
import numpy as np
from scipy.sparse.linalg import cg
from scipy.sparse.linalg import gmres
from pheromone_dispersion.advection_operator import AdvectionAdjoint
from pheromone_dispersion.diffusion_operator import Diffusion
from pheromone_dispersion.identity_operator import Id
from pheromone_dispersion.reaction_operator import Reaction
[docs]
class AdjointDiffusionConvectionReaction2DEquation:
r"""
Class containing the adjoint model of the 2D diffusion-convection-reaction PDE model :
.. math::
\partial_tc^* + \nabla\cdot(\mathbf{K}^T\nabla c^*)+\nabla(\vec{u}c^*)-(\nabla.\vec{u})c^*-\tau_{loss}c^*
= \left(\frac{dm}{dc}(c(s))\right)^*\cdot2\mathbf{R}^{-1}\left(m(c(s))-m_{obs}\right)
~\forall (x,y)\in\Omega~\forall t\in[0;T[
with the final and boundary conditions:
- a null final condition :math:`c(x,y,t=T)=0~\forall (x,y)\in\Omega`,
- a null diffusive flux :math:`\mathbf{K}^T\nabla c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega`,
- a null outgoing convective flux
:math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}>0\}~\forall t\in]0;T]`
with :math:`\vec{n}` the outgoing normal vector,
and its solvers.
Attributes
----------
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
A_adjoint : ~pheromone_dispersion.advection_operator.AdvectionAdjoint
The adjoint of the convection linear operator
:math:`A^*:c^*\mapsto - \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*~\forall (x,y)\in\Omega~\forall t\in]0;T]`
with the boundary condition
:math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}>0\}~\forall t\in]0;T]`.
D : ~pheromone_dispersion.diffusion_operator.Diffusion
The diffusion linear operator with its adjoint operator
:math:`D^*:c\mapsto \nabla\cdot(\mathbf{K}^T\nabla c)~\forall (x,y)\in\Omega~\forall t\in]0;T]`
R : ~pheromone_dispersion.reaction_operator.Reaction
The reaction linear operator with its adjoint operator :math:`R^*:c\mapsto \tau_{loss}c~\forall (x,y)\in\Omega~\forall t\in]0;T]`.
ID : ~pheromone_dispersion.identity_operator.Id
The identity operator :math:`Id:c\mapsto c~\forall (x,y)\in\Omega~\forall t\in]0;T]`.
implemented_solver_type: list of str
The list of keywords of the implemented types of time discretization.
The implemented types of time discretization are:
`['implicit', 'semi-implicit', 'semi-implicit with matrix inversion', 'implicit with stationnary matrix inversion']`
time_discretization: str
The keyword identifying the type of time discretization.
The type of time discretization should be the same the one
used for the pheromone propagation model implemented in the class
:class:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation`
to ensure that the adjoint of the discretization of the pheromone propagation model
coincides with the discretization of the adjoint model.
tol_inversion : float
Tolerance for the inversion estimation algorithm called at each time step.
transpose_inv_matrix_implicit_part: ~numpy.ndarray or None
Inverse matrix of the implicit part matrix of the time discretization of the adjoint model.
If the time discretizations are the same, this matrix coincides
with the transpose of the inverse matrix of the implicit part matrix
of the time discretization of the pheromone propagation implemented
in the class
:class:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation`.
Initialized to `None`.
Notes
-----
In a future version of the module, the adjoint operator of the convection operator
:math:`A^*:c^*\mapsto - \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*` will be implemented
in the :class:`~pheromone_dispersion.advection_operator.Advection` class,
instead of implemented in the :class:`~pheromone_dispersion.advection_operator.AdvectionAdjoint` class.
"""
[docs]
def __init__(self, U, K, coeff_depot, msh, time_discretization='semi-implicit', tol_inversion=1e-14):
r"""
Constructor method
Parameters
----------
msh: ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
U: ~pheromone_dispersion.velocity.Velocity
The wind field :math:`\vec{u}(x,y,t)`.
K: ~pheromone_dispersion.diffusion_tensor.DiffusionTensor
The diffusion tensor :math:`\mathbf{K}(x,y,t)`.
coeff_depot: ~numpy.ndarray
The deposition coefficient :math:`\tau_{loss}(x,y)`.
obs : ~source_localization.obs.Obs
Object containing all the features related to the observations and estimation of the observed variables
time_discretization: str, default: 'semi-implicit'
The keyword identifying the type of time discretization.
tol_inversion: float, optional, default: 1e-14
Tolerance for the inversion estimation algorithm called at each time step.
Raises
------
ValueError
if the type of discretization is not implemented,
i.e. if :attr:`time_discretization` is not in :attr:`implemented_solver_type`
"""
self.msh = msh
self.D = Diffusion(K, msh)
self.R = Reaction(coeff_depot, msh)
self.Id = Id(msh)
self.time_discretization = time_discretization
self.implemented_solver_type = [
'implicit',
'semi-implicit',
'semi-implicit with matrix inversion',
'implicit with stationnary matrix inversion',
]
if self.time_discretization not in self.implemented_solver_type:
raise ValueError("The given time discretization is not implemented.")
self.A_adjoint = AdvectionAdjoint(U, msh)
self.tol_inversion = tol_inversion
self.transpose_inv_matrix_implicit_part = None
[docs]
def init_inverse_matrix(self, path_to_matrix=None, matrix_file_name=None):
"""
Initialize the attribute :attr:`transpose_inv_matrix_implicit_part` if needed.
If the inverse matrix of the implicit part matrix of the time discretization
of the associated pheromone propagation model has already been computed,
it loads the previously computed inverse matrix and compute the adjoint.
Otherwise, raises an error as the matrix should have been computed
when initializing the pheromone propagation model as object
of the class :class:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation`
using the method :meth:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation.init_inverse_matrix`.
Parameters
----------
path_to_matrix: str, optional, default: None
Path where to save or load the inverse matrix.
If not provided, set to `'./data'`.
matrix_file_name: str, optional, default: None
Name of the file where the matrix stored or will be saved.
If not provided, set to `'inv_matrix_**_scheme'`
with ** either `'implicit'` or `'semi_implicit'` depending on the time discretization.
Raises
------
ValueError
if no files correspond to the provided path and file name.
Notes
-----
This method is usefull only if :attr:`time_discretization` is
either `'semi-implicit with matrix inversion'` or `'implicit with stationnary matrix inversion'`.
Otherwise, the linear system to solve at each time steps is solved using conjugate gradient or GMRES algorithm,
and the attribute :attr:`transpose_inv_matrix_implicit_part` is not used.
"""
if path_to_matrix is None:
path_to_matrix = './data'
if not os.path.isdir(path_to_matrix):
os.makedirs(path_to_matrix)
if matrix_file_name is None:
if self.time_discretization == 'semi-implicit with matrix inversion':
matrix_file_name = 'inv_matrix_semi_implicit_scheme'
if self.time_discretization == 'implicit with stationnary matrix inversion':
matrix_file_name = 'inv_matrix_implicit_scheme'
if matrix_file_name[-4:] != '.npy':
matrix_file_name += '.npy'
if not (Path(path_to_matrix) / matrix_file_name).exists():
raise ValueError("The file to load does not exist. Either the path or the file name are wrong")
else:
print("=== Load of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===")
if self.time_discretization == 'semi-implicit with matrix inversion':
self.transpose_inv_matrix_implicit_part = np.transpose(np.load(Path(path_to_matrix) / matrix_file_name))
if self.time_discretization == 'implicit with stationnary matrix inversion':
self.transpose_inv_matrix_implicit_part = np.transpose(np.load(Path(path_to_matrix) / matrix_file_name))
[docs]
def at_current_time(self, tc):
r"""
Update the attributes :attr:`A_adjoint` and :attr:`D` at a given time.
Parameters
----------
tc : float or integer
The current time.
Notes
-----
Updates the attributes :attr:`A_adjoint` and :attr:`D` their own attributes
using the method :meth:`~pheromone_dispersion.advection_operator.AdvectionAdjoint.at_current_time` of resp.
the class :class:`~pheromone_dispersion.advection_operator.AdvectionAdjoint` and
the class :class:`~pheromone_dispersion.diffusion_operator.Diffusion`.
"""
self.D.at_current_time(tc)
self.A_adjoint.at_current_time(tc)
[docs]
def solver(self, adjoint_derivative_obs_operator, cost, display_flag=True):
r"""
Compute the adjoint state :math:`c^*(x,y,t)` by solving the adjoint model on the whole time window.
Parameters
----------
adjoint_derivative_obs_operator: callable
Function that returns the adjoint of the derivative of the observation operator map
:math:`\left(\left(\frac{dm}{dc}\right)^*\cdot\delta m\right) (x,y)`
given the current time :math:`t` and
an element of the space of the observed variable :math:`\delta m`.
cost: ~source_localization.cost.Cost
The cost function,
containing especially the gap between the observation data
and the prediction of the observation variable
:math:`2\mathbf{R}^{-1}\left(m(c(s))-m_{obs}\right)`.
display_flag: bool, default: True
If `True`, print the evolution of the solver through the time iterations.
Returns
-------
p_out: ~numpy.ndarray
The adjoint state :math:`c^*(x,y,t)`.
Notes
-----
The output :math:`c^*(x,y,t)` has been raveled into
a (:attr:`msh.t_array.size` * :attr:`msh.y.size` * :attr:`msh.x.size`,)-shape array
to match the format of the attribute :attr:`~source_localization.control.Control.value`
of the class :class:`~source_localization.control.Control`.
"""
# initialization of the unknown variable at the final time and of the output
p = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],))
p_out = np.array([])
# loop until the initial time is reached (backward in time)
for it, self.msh.t in enumerate(self.msh.t_array[::-1]):
if display_flag:
sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s')
sys.stdout.flush()
# update the coefficients of the equation at the current time and
self.at_current_time(self.msh.t)
# inverse the linear system using a conjugate gradient method for the current time step
p_old = np.copy(p) # NECESSARY???
# inverse the linear system resulting the semi-implicit time discretization
# using a conjugate gradient method for the current time step
if self.time_discretization == 'semi-implicit':
p, info = cg(
self.Id + self.msh.dt * (-self.D + self.R),
p_old
- self.msh.dt
* (self.A_adjoint.rmatvec(p_old) + adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d())),
x0=p_old,
tol=self.tol_inversion,
)
# inverse the linear system resulting the semi-implicit time discretization
# using the pre-computed inverse matrix
elif self.time_discretization == 'semi-implicit with stationnary matrix inversion':
RHS = p_old - self.msh.dt * (
self.A_adjoint.rmatvec(p_old) + adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d())
)
if not np.linalg.norm(RHS, ord=np.inf) < 1e-16:
LHS = (self.Id + self.msh.dt * (-self.D + self.R)).matvec(p_old)
flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS)
if flag_residu:
p = np.dot(self.transpose_inv_matrix_implicit_part, RHS)
else:
p = np.zeros_like(p)
info = 0
# inverse the linear system resulting the implicit time discretization
# using a gmres method for the current time step
elif self.time_discretization == 'implicit':
p, info = gmres(
self.Id + self.msh.dt * (-self.D + self.R + self.A_adjoint),
p_old - self.msh.dt * adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()),
x0=p_old,
tol=self.tol_inversion,
)
# inverse the linear system resulting the implicit time discretization
# using the pre-computed inverse matrix
elif self.time_discretization == 'implicit with stationnary matrix inversion':
RHS = p_old - self.msh.dt * adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d())
if not np.linalg.norm(RHS, ord=np.inf) < 1e-16:
LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A_adjoint)).matvec(p_old)
flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS)
if flag_residu:
p = np.dot(self.transpose_inv_matrix_implicit_part, RHS)
else:
p = np.zeros_like(p)
info = 0
if info > 0:
raise ValueError(
"The algorithme used to solve the linear system has not converge"
+ "to the expected tolerance or within the maximum number of iteration."
)
if info < 0:
raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.")
# store the result in the output variable
p_out = np.append(np.copy(p), p_out) # np.copy NECESSARY???
return p_out