Source code for source_localization.adjoint_convection_diffusion_2D

import os
import sys
from pathlib import Path

import numpy as np
from scipy.sparse.linalg import cg
from scipy.sparse.linalg import gmres

from pheromone_dispersion.advection_operator import AdvectionAdjoint
from pheromone_dispersion.diffusion_operator import Diffusion
from pheromone_dispersion.identity_operator import Id
from pheromone_dispersion.reaction_operator import Reaction


[docs] class AdjointDiffusionConvectionReaction2DEquation: r""" Class containing the adjoint model of the 2D diffusion-convection-reaction PDE model : .. math:: \partial_tc^* + \nabla\cdot(\mathbf{K}^T\nabla c^*)+\nabla(\vec{u}c^*)-(\nabla.\vec{u})c^*-\tau_{loss}c^* = \left(\frac{dm}{dc}(c(s))\right)^*\cdot2\mathbf{R}^{-1}\left(m(c(s))-m_{obs}\right) ~\forall (x,y)\in\Omega~\forall t\in[0;T[ with the final and boundary conditions: - a null final condition :math:`c(x,y,t=T)=0~\forall (x,y)\in\Omega`, - a null diffusive flux :math:`\mathbf{K}^T\nabla c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega`, - a null outgoing convective flux :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}>0\}~\forall t\in]0;T]` with :math:`\vec{n}` the outgoing normal vector, and its solvers. Attributes ---------- msh : ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. A_adjoint : ~pheromone_dispersion.advection_operator.AdvectionAdjoint The adjoint of the convection linear operator :math:`A^*:c^*\mapsto - \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*~\forall (x,y)\in\Omega~\forall t\in]0;T]` with the boundary condition :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}>0\}~\forall t\in]0;T]`. D : ~pheromone_dispersion.diffusion_operator.Diffusion The diffusion linear operator with its adjoint operator :math:`D^*:c\mapsto \nabla\cdot(\mathbf{K}^T\nabla c)~\forall (x,y)\in\Omega~\forall t\in]0;T]` R : ~pheromone_dispersion.reaction_operator.Reaction The reaction linear operator with its adjoint operator :math:`R^*:c\mapsto \tau_{loss}c~\forall (x,y)\in\Omega~\forall t\in]0;T]`. ID : ~pheromone_dispersion.identity_operator.Id The identity operator :math:`Id:c\mapsto c~\forall (x,y)\in\Omega~\forall t\in]0;T]`. implemented_solver_type: list of str The list of keywords of the implemented types of time discretization. The implemented types of time discretization are: `['implicit', 'semi-implicit', 'semi-implicit with matrix inversion', 'implicit with stationnary matrix inversion']` time_discretization: str The keyword identifying the type of time discretization. The type of time discretization should be the same the one used for the pheromone propagation model implemented in the class :class:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation` to ensure that the adjoint of the discretization of the pheromone propagation model coincides with the discretization of the adjoint model. tol_inversion : float Tolerance for the inversion estimation algorithm called at each time step. transpose_inv_matrix_implicit_part: ~numpy.ndarray or None Inverse matrix of the implicit part matrix of the time discretization of the adjoint model. If the time discretizations are the same, this matrix coincides with the transpose of the inverse matrix of the implicit part matrix of the time discretization of the pheromone propagation implemented in the class :class:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation`. Initialized to `None`. Notes ----- In a future version of the module, the adjoint operator of the convection operator :math:`A^*:c^*\mapsto - \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*` will be implemented in the :class:`~pheromone_dispersion.advection_operator.Advection` class, instead of implemented in the :class:`~pheromone_dispersion.advection_operator.AdvectionAdjoint` class. """
[docs] def __init__(self, U, K, coeff_depot, msh, time_discretization='semi-implicit', tol_inversion=1e-14): r""" Constructor method Parameters ---------- msh: ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. U: ~pheromone_dispersion.velocity.Velocity The wind field :math:`\vec{u}(x,y,t)`. K: ~pheromone_dispersion.diffusion_tensor.DiffusionTensor The diffusion tensor :math:`\mathbf{K}(x,y,t)`. coeff_depot: ~numpy.ndarray The deposition coefficient :math:`\tau_{loss}(x,y)`. obs : ~source_localization.obs.Obs Object containing all the features related to the observations and estimation of the observed variables time_discretization: str, default: 'semi-implicit' The keyword identifying the type of time discretization. tol_inversion: float, optional, default: 1e-14 Tolerance for the inversion estimation algorithm called at each time step. Raises ------ ValueError if the type of discretization is not implemented, i.e. if :attr:`time_discretization` is not in :attr:`implemented_solver_type` """ self.msh = msh self.D = Diffusion(K, msh) self.R = Reaction(coeff_depot, msh) self.Id = Id(msh) self.time_discretization = time_discretization self.implemented_solver_type = [ 'implicit', 'semi-implicit', 'semi-implicit with matrix inversion', 'implicit with stationnary matrix inversion', ] if self.time_discretization not in self.implemented_solver_type: raise ValueError("The given time discretization is not implemented.") self.A_adjoint = AdvectionAdjoint(U, msh) self.tol_inversion = tol_inversion self.transpose_inv_matrix_implicit_part = None
[docs] def init_inverse_matrix(self, path_to_matrix=None, matrix_file_name=None): """ Initialize the attribute :attr:`transpose_inv_matrix_implicit_part` if needed. If the inverse matrix of the implicit part matrix of the time discretization of the associated pheromone propagation model has already been computed, it loads the previously computed inverse matrix and compute the adjoint. Otherwise, raises an error as the matrix should have been computed when initializing the pheromone propagation model as object of the class :class:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation` using the method :meth:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation.init_inverse_matrix`. Parameters ---------- path_to_matrix: str, optional, default: None Path where to save or load the inverse matrix. If not provided, set to `'./data'`. matrix_file_name: str, optional, default: None Name of the file where the matrix stored or will be saved. If not provided, set to `'inv_matrix_**_scheme'` with ** either `'implicit'` or `'semi_implicit'` depending on the time discretization. Raises ------ ValueError if no files correspond to the provided path and file name. Notes ----- This method is usefull only if :attr:`time_discretization` is either `'semi-implicit with matrix inversion'` or `'implicit with stationnary matrix inversion'`. Otherwise, the linear system to solve at each time steps is solved using conjugate gradient or GMRES algorithm, and the attribute :attr:`transpose_inv_matrix_implicit_part` is not used. """ if path_to_matrix is None: path_to_matrix = './data' if not os.path.isdir(path_to_matrix): os.makedirs(path_to_matrix) if matrix_file_name is None: if self.time_discretization == 'semi-implicit with matrix inversion': matrix_file_name = 'inv_matrix_semi_implicit_scheme' if self.time_discretization == 'implicit with stationnary matrix inversion': matrix_file_name = 'inv_matrix_implicit_scheme' if matrix_file_name[-4:] != '.npy': matrix_file_name += '.npy' if not (Path(path_to_matrix) / matrix_file_name).exists(): raise ValueError("The file to load does not exist. Either the path or the file name are wrong") else: print("=== Load of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===") if self.time_discretization == 'semi-implicit with matrix inversion': self.transpose_inv_matrix_implicit_part = np.transpose(np.load(Path(path_to_matrix) / matrix_file_name)) if self.time_discretization == 'implicit with stationnary matrix inversion': self.transpose_inv_matrix_implicit_part = np.transpose(np.load(Path(path_to_matrix) / matrix_file_name))
[docs] def at_current_time(self, tc): r""" Update the attributes :attr:`A_adjoint` and :attr:`D` at a given time. Parameters ---------- tc : float or integer The current time. Notes ----- Updates the attributes :attr:`A_adjoint` and :attr:`D` their own attributes using the method :meth:`~pheromone_dispersion.advection_operator.AdvectionAdjoint.at_current_time` of resp. the class :class:`~pheromone_dispersion.advection_operator.AdvectionAdjoint` and the class :class:`~pheromone_dispersion.diffusion_operator.Diffusion`. """ self.D.at_current_time(tc) self.A_adjoint.at_current_time(tc)
[docs] def solver(self, adjoint_derivative_obs_operator, cost, display_flag=True): r""" Compute the adjoint state :math:`c^*(x,y,t)` by solving the adjoint model on the whole time window. Parameters ---------- adjoint_derivative_obs_operator: callable Function that returns the adjoint of the derivative of the observation operator map :math:`\left(\left(\frac{dm}{dc}\right)^*\cdot\delta m\right) (x,y)` given the current time :math:`t` and an element of the space of the observed variable :math:`\delta m`. cost: ~source_localization.cost.Cost The cost function, containing especially the gap between the observation data and the prediction of the observation variable :math:`2\mathbf{R}^{-1}\left(m(c(s))-m_{obs}\right)`. display_flag: bool, default: True If `True`, print the evolution of the solver through the time iterations. Returns ------- p_out: ~numpy.ndarray The adjoint state :math:`c^*(x,y,t)`. Notes ----- The output :math:`c^*(x,y,t)` has been raveled into a (:attr:`msh.t_array.size` * :attr:`msh.y.size` * :attr:`msh.x.size`,)-shape array to match the format of the attribute :attr:`~source_localization.control.Control.value` of the class :class:`~source_localization.control.Control`. """ # initialization of the unknown variable at the final time and of the output p = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],)) p_out = np.array([]) # loop until the initial time is reached (backward in time) for it, self.msh.t in enumerate(self.msh.t_array[::-1]): if display_flag: sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s') sys.stdout.flush() # update the coefficients of the equation at the current time and self.at_current_time(self.msh.t) # inverse the linear system using a conjugate gradient method for the current time step p_old = np.copy(p) # NECESSARY??? # inverse the linear system resulting the semi-implicit time discretization # using a conjugate gradient method for the current time step if self.time_discretization == 'semi-implicit': p, info = cg( self.Id + self.msh.dt * (-self.D + self.R), p_old - self.msh.dt * (self.A_adjoint.rmatvec(p_old) + adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d())), x0=p_old, tol=self.tol_inversion, ) # inverse the linear system resulting the semi-implicit time discretization # using the pre-computed inverse matrix elif self.time_discretization == 'semi-implicit with stationnary matrix inversion': RHS = p_old - self.msh.dt * ( self.A_adjoint.rmatvec(p_old) + adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()) ) if not np.linalg.norm(RHS, ord=np.inf) < 1e-16: LHS = (self.Id + self.msh.dt * (-self.D + self.R)).matvec(p_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) if flag_residu: p = np.dot(self.transpose_inv_matrix_implicit_part, RHS) else: p = np.zeros_like(p) info = 0 # inverse the linear system resulting the implicit time discretization # using a gmres method for the current time step elif self.time_discretization == 'implicit': p, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A_adjoint), p_old - self.msh.dt * adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()), x0=p_old, tol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization # using the pre-computed inverse matrix elif self.time_discretization == 'implicit with stationnary matrix inversion': RHS = p_old - self.msh.dt * adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()) if not np.linalg.norm(RHS, ord=np.inf) < 1e-16: LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A_adjoint)).matvec(p_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) if flag_residu: p = np.dot(self.transpose_inv_matrix_implicit_part, RHS) else: p = np.zeros_like(p) info = 0 if info > 0: raise ValueError( "The algorithme used to solve the linear system has not converge" + "to the expected tolerance or within the maximum number of iteration." ) if info < 0: raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.") # store the result in the output variable p_out = np.append(np.copy(p), p_out) # np.copy NECESSARY??? return p_out