Source code for source_localization.adjoint_convection_diffusion_2D

import os
import sys
from pathlib import Path

import numpy as np
from scipy.sparse.linalg import cg
from scipy.sparse.linalg import gmres

from pheromone_dispersion.advection_operator import Advection
from pheromone_dispersion.advection_operator import AdvectionAdjoint
from pheromone_dispersion.diffusion_operator import Diffusion
from pheromone_dispersion.identity_operator import Id
from pheromone_dispersion.reaction_operator import Reaction


[docs] class AdjointDiffusionConvectionReaction2DEquation: """ Class containing the adjoint model of the 2D diffusion-convection-reaction PDE model and its solvers """
[docs] def __init__(self, U, K, coeff_depot, msh, time_discretization='semi-implicit', tol_inversion=1e-14): """ Instanciation of the class. - input: * msh: object of the class mesh_rect_2D * U: object of the class velocity * K: object of the class diffusion_tensor * coeff_depot: array of shape (msh.y, msh.x) containing the deposition coefficient * obs: object of the class Obs, contains the observation operator and its derivative wrt the state variable * msh: object of the class mesh_rect_2D * time_discretization: string, describes the type of time discretization by default is 'semi-implicit', can also be 'implicit' - attributes: * msh: object of the class mesh_rect_2D * A: object of the class Advection, if the time discretization is semi-implicit, contains the convection linear operator * A_adjoint: object of the class Advection, if the time discretization is implicit, contains the adjoint of the convection linear operator * D: oject of the class Diffusion, contains the diffusion linear operator * R: object of the class Reaction, contains the reaction linear operator * ID: object of the class Id, contains the identity operator * obs: object of the class Obs, contains the observation operator and its derivative wrt the state variable * negative_divU_advection_term: object of the class Reaction, if the time discretization is semi-implicit, contains negative part of the reaction term in div U coming from the splitting of the adjoint of the advection term into a advection flux part and a reaction part * positive_divU_advection_term: object of the class Reaction, if the time discretization is semi-implicit, contains positive part of the reaction term in div U coming from the splitting of the adjoint of the advection term into a advection flux part and a reaction part * time_discretization: string, describes the type of time discretization """ self.msh = msh self.D = Diffusion(K, msh) self.R = Reaction(coeff_depot, msh) self.Id = Id(msh) self.time_discretization = time_discretization implemented_solver_type = [ 'implicit', 'semi-implicit', 'implicit with matrix inversion', 'semi-implicit with matrix inversion', 'implicit with preconditionning', 'implicit with stationnary matrix inversion', ] if self.time_discretization not in implemented_solver_type: raise ValueError("The given time discretization is not implemented.") if time_discretization == 'semi-implicit': self.A = Advection(U, msh) pos_div_val = np.where(self.A.U.div >= 0, self.A.U.div, 0.0) neg_div_val = np.where(self.A.U.div < 0, self.A.U.div, 0.0) self.negative_divU_advection_term = Reaction(neg_div_val, msh) self.positive_divU_advection_term = Reaction(pos_div_val, msh) self.A_adjoint = AdvectionAdjoint(U, msh) # elif time_discretization == 'implicit': # self.A_adjoint = AdvectionAdjoint(U, msh) self.tol_inversion = tol_inversion self.transpose_inv_matrix_semi_implicit_scheme = None self.transpose_inv_matrix_implicit_scheme = None self.ILU_decomp_dic = {} self.jacobi = None self.inv_matrix_implicit_scheme_t_init = None
[docs] def init_inverse_matrix(self, path_to_matrix=None, matrix_file_name=None): if path_to_matrix is None: path_to_matrix = './data' if not os.path.isdir(path_to_matrix): os.makedirs(path_to_matrix) if matrix_file_name is None: if self.time_discretization == 'semi-implicit with matrix inversion': matrix_file_name = 'inv_matrix_semi_implicit_scheme' if self.time_discretization == 'implicit with matrix inversion': matrix_file_name = 'inv_matrix_implicit_scheme' if self.time_discretization == 'implicit with stationnary matrix inversion': matrix_file_name = 'inv_matrix_implicit_scheme' matrix_file_name += '.npy' if not (Path(path_to_matrix) / matrix_file_name).exists(): raise ValueError("The file to load does not exist. Either the path or the file name are wrong") else: print("=== Load of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===") if self.time_discretization == 'semi-implicit with matrix inversion': self.transpose_inv_matrix_semi_implicit = np.transpose(np.load(Path(path_to_matrix) / matrix_file_name)) if self.time_discretization == 'implicit with matrix inversion': self.transpose_inv_matrix_implicit_scheme = np.transpose(np.load(Path(path_to_matrix) / matrix_file_name)) if self.time_discretization == 'implicit with stationnary matrix inversion': self.transpose_inv_matrix_implicit_scheme = np.transpose(np.load(Path(path_to_matrix) / matrix_file_name))
[docs] def at_current_time(self, tc): """ Update the linear operators of the PDE at a given time, i.e. update the attributes D and S of the class if the time discretization is semi-implicit, A, positive_divU_advection_term, negative_divU_advection_term, if the time discretization is implicit, A_adjoint, and the derivative of the observation operator of the attribute obs. - input: * tc: the current time """ self.D.at_current_time(tc) if self.time_discretization == 'semi-implicit': self.A.at_current_time(tc, self.A.minus_U) pos_div_val = np.where(self.A.U.div >= 0, self.A.U.div, 0.0) neg_div_val = np.where(self.A.U.div < 0, self.A.U.div, 0.0) self.negative_divU_advection_term.update_reaction_coeff(neg_div_val) self.positive_divU_advection_term.update_reaction_coeff(pos_div_val) # elif self.time_discretization == 'implicit': self.A_adjoint.at_current_time(tc)
[docs] def solver(self, adjoint_derivative_obs_operator, cost, display_flag=True): """ solve the PDE on the whole time window - input: * cost: object of the class Cost, contains especially a method that compute the gradient of the objectif wrt the observed variable for the current optimization iteration and estimation of the observed variable * display_flag: boolean, True by default, print the evolution in time of the solver if True - output: * p: numpy array of shape (msh.t_array.size * msh.y.size * msh.x.size,), contains the adjoint state everywhere and at all time step concatenated in a vector """ # initialization of the unknown variable at the final time and of the output p = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],)) p_out = np.array([]) # loop until the initial time is reached (backward in time) for it, self.msh.t in enumerate(self.msh.t_array[::-1]): if display_flag: sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s') sys.stdout.flush() # update the coefficients of the equation at the current time and self.at_current_time(self.msh.t) # inverse the linear system using a conjugate gradient method for the current time step p_old = np.copy(p) # NECESSARY??? # inverse the linear system resulting the semi-implicit time discretization # using a conjugate gradient method for the current time step if self.time_discretization == 'semi-implicit': p, info = cg( self.Id + self.msh.dt * (-self.D + self.R + self.negative_divU_advection_term), p_old - self.msh.dt * ( self.A.rmatvec(p_old) + self.positive_divU_advection_term(p_old) + adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()) ), x0=p_old, tol=self.tol_inversion, ) # p = self.transpose_inv_matrix_semi_implicit.dot( # p_old # - self.msh.dt # * (-self.A_adjoint.matvec(p_old) + adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d())) # ) # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step elif self.time_discretization == 'implicit': p, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A_adjoint), p_old - self.msh.dt * adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()), x0=p_old, tol=self.tol_inversion, ) elif self.time_discretization == 'implicit with matrix inversion': p = self.transpose_inv_matrix_implicit_scheme[it, :, :].dot( p_old - self.msh.dt * adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()) ) info = 0 elif self.time_discretization == 'implicit with stationnary matrix inversion': RHS = p_old - self.msh.dt * adjoint_derivative_obs_operator(self.msh.t, cost.gradient_objectif_wrt_d()) if not np.linalg.norm(RHS, ord=np.inf) < 1e-16: LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A_adjoint)).matvec(p_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) if flag_residu: p = np.dot(self.transpose_inv_matrix_implicit_scheme, RHS) else: p = np.zeros_like(p) info = 0 if info > 0: raise ValueError( "The algorithme used to solve the linear system has not converge" + "to the expected tolerance or within the maximum number of iteration." ) if info < 0: raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.") # store the result in the output variable p_out = np.append(np.copy(p), p_out) # np.copy NECESSARY??? return p_out