Source code for pheromone_dispersion.convection_diffusion_2D

import os
import sys
import time
from pathlib import Path

import numpy as np
import scipy as sp
from scipy.sparse.linalg import cg
from scipy.sparse.linalg import gmres
from scipy.sparse.linalg import spsolve_triangular

from pheromone_dispersion.advection_operator import Advection
from pheromone_dispersion.diffusion_operator import Diffusion
from pheromone_dispersion.identity_operator import Id
from pheromone_dispersion.reaction_operator import Reaction
from pheromone_dispersion.source_term import Source

"""
Ajouter des tests dans le cas stationnaire pour verifier que les matrices sont 3D
"""


[docs] class DiffusionConvectionReaction2DEquation: """ Class containing the 2D diffusion-convection-reaction PDE and its solvers """
[docs] def __init__(self, U, K, coeff_depot, S, msh, time_discretization='semi-implicit', tol_inversion=1e-14): """ Instanciation of the class. - input: * msh: object of the class MeshRect2D * U: object of the class Velocity * K: object of the class DiffusionTensor * coeff_depot: array of shape (msh.y, msh.x) containing the deposition coefficient * S: object of the class Source, contains the source term * time_discretization: string, describes the type of time discretization by default is 'semi-implicit', can also be 'implicit' - attributes: * msh: object of the class MeshRect2D * A: object of the class Advection, contains the convection linear operator * D: oject of the class Diffusion, contains the diffusion linear operator * R: object of the class Reaction, contains the reaction linear operator * ID: object of the class Id, contains the identity operator * S: object of the class Source, contains the source term * time_discretization: string, describes the type of time discretization - TO DO: * Add exceptions in case the matrix are computed/initialized """ self.msh = msh self.A = Advection(U, msh) self.D = Diffusion(K, msh) self.R = Reaction(coeff_depot, msh) self.Id = Id(msh) self.S = S self.time_discretization = time_discretization self.tol_inversion = tol_inversion implemented_solver_type = [ 'implicit', 'semi-implicit', 'implicit with matrix inversion', 'semi-implicit with matrix inversion', 'implicit with preconditionning', 'implicit with stationnary matrix inversion', ] if self.time_discretization not in implemented_solver_type: raise ValueError("The given time discretization is not implemented.") self.inv_matrix_semi_implicit_scheme = None self.inv_matrix_implicit_scheme = None self.ILU_decomp_dic = {} self.jacobi = None self.inv_matrix_implicit_scheme_t_init = None
[docs] def spsolve_lu(self, b): if self.ILU_decomp_dic['perm r'] is not None: b_old = b.copy() for old_ndx, new_ndx in enumerate(self.ILU_decomp_dic['perm r']): b[new_ndx] = b_old[old_ndx] try: # unit_diagonal is a new kw c = spsolve_triangular(self.ILU_decomp_dic['L'], b, lower=True, unit_diagonal=True) except TypeError: c = spsolve_triangular(self.ILU_decomp_dic['L'], b, lower=True) px = spsolve_triangular(self.ILU_decomp_dic['U'], c, lower=False) if self.ILU_decomp_dic['perm c'] is None: return px return px[self.ILU_decomp_dic['perm c']]
[docs] def init_inverse_matrix(self, path_to_matrix=None, matrix_file_name=None): if path_to_matrix is None: path_to_matrix = './data' if not os.path.isdir(path_to_matrix): os.makedirs(path_to_matrix) if matrix_file_name is None: if self.time_discretization == 'semi-implicit with matrix inversion': matrix_file_name = 'inv_matrix_semi_implicit_scheme' if self.time_discretization == 'implicit with matrix inversion': matrix_file_name = 'inv_matrix_implicit_scheme' if self.time_discretization == 'implicit with stationnary matrix inversion': matrix_file_name = 'inv_matrix_implicit_scheme' matrix_file_name += '.npy' if not (Path(path_to_matrix) / matrix_file_name).exists(): print("=== Computation of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===") Identity = np.identity(self.msh.y.size * self.msh.x.size) if self.time_discretization == 'semi-implicit with matrix inversion': matrix_semi_implicit_scheme = Identity + self.msh.dt * (-self.D._matmat(Identity) + self.R._matmat(Identity)) self.inv_matrix_semi_implicit_scheme = sp.linalg.inv(matrix_semi_implicit_scheme) np.save(Path(path_to_matrix) / matrix_file_name + '.npy', self.inv_matrix_semi_implicit_scheme) if self.time_discretization == 'implicit with matrix inversion': t_i = time.time() for it, self.msh.t in enumerate(self.msh.t_array[1:]): self.at_current_time(self.msh.t) matrix_implicit_scheme = Identity + self.msh.dt * ( -self.D._matmat(Identity) + self.R._matmat(Identity) + self.A._matmat(Identity) ) self.inv_matrix_implicit_scheme = sp.linalg.inv(matrix_implicit_scheme) print("--- Computation at time t= ", self.msh.t, "in ", time.time() - t_i, " s") t_i = time.time() np.save( Path(path_to_matrix) / matrix_file_name + f'_ite{"{0:04d}".format(it+1)}' + '.npy', self.inv_matrix_implicit_scheme ) if self.time_discretization == 'implicit with stationnary matrix inversion': t_i = time.time() matrix_implicit_scheme = Identity + self.msh.dt * ( -self.D._matmat(Identity) + self.R._matmat(Identity) + self.A._matmat(Identity) ) self.inv_matrix_implicit_scheme = sp.linalg.inv(matrix_implicit_scheme) print("--- Computation at time t= ", self.msh.t, "in ", time.time() - t_i, " s") t_i = time.time() np.save(Path(path_to_matrix) / matrix_file_name + '.npy', self.inv_matrix_implicit_scheme) else: print("=== Load of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===") if self.time_discretization == 'semi-implicit with matrix inversion': self.inv_matrix_semi_implicit_scheme = np.load(Path(path_to_matrix) / matrix_file_name) if self.time_discretization == 'implicit with matrix inversion': self.inv_matrix_implicit_scheme = np.load(Path(path_to_matrix) / matrix_file_name) if self.time_discretization == 'implicit with stationnary matrix inversion': self.inv_matrix_implicit_scheme = np.load(Path(path_to_matrix) / matrix_file_name) if self.time_discretization == 'implicit with preconditionning': self.inv_matrix_implicit_scheme_t_init = np.load(Path(path_to_matrix) / matrix_file_name)
[docs] def set_source(self, value, t=None): """ Update the source term with the values provided as input - input: * value: numpy array of shape (t.size, msh.y.size, msh.x.size), the new values of the source term * t: numpy array of shape (t.size,), the vector containing the associated times, by default None if the source is stationnary """ setattr(self, 'S', Source(self.msh, value, t=t))
[docs] def at_current_time(self, tc, i=None): """ Update the linear operators of the PDE (attributes A, D and S of the class) at a given time. - input: * tc: the current time """ self.S.at_current_time(tc) if not self.time_discretization == 'implicit with stationnary matrix inversion': self.A.at_current_time(tc, self.A.U) self.D.at_current_time(tc)
# if self.time_discretization == 'implicit with preconditionning': # #dic_name = 'ILU_decomp_dic' # path = './output' # matrix_name = 'Jacobi_matrix' # matrix_file_name = matrix_name + f'_ite{"{0:04d}".format(i+1)}.npz' # self.jacobi = sps.load_npz(Path(path) / matrix_file_name) # dic_file_name = dic_name + f'_ite{"{0:04d}".format(i+1)}.pklz' # with gzip.open(Path(path) / dic_file_name, 'rb') as f: # self.ILU_decomp_dic = pickle.load(f)
[docs] def solver_one_time_step(self, c): """ solve the PDE for one time step inverse the linear system (I + dt (- D + R)).c^{n+1} = c^n + dt (- A.c^n + S) - input: * c: numpy array of shape (msh.y.size, msh.x.size), contains the concentration of pheromones at the previous time step - output: * numpy array of shape (msh.y.size, msh.x.size), resulting concentration at the current time step """ if c.shape != (self.msh.y.size, self.msh.x.size): raise ValueError( "The shape of the concentration at the previous time at the center of the cells does not match with the shape of the msh." ) # ravel the matrix of concentration into a msh.y.size*msh.x.size size vector to match with the format of the LinearOperator class c_ravel = c.ravel() # inverse the linear system using a conjugate gradient method c_ravel, tol = cg(self.Id + self.msh.dt * (-self.D + self.R), c_ravel + self.msh.dt * (-self.A.dot(c_ravel) + self.S.value.ravel())) return c_ravel.reshape((c.shape))
[docs] def solver(self, save_all=False, path_save='./save/', display_flag=True): """ solve the PDE on the whole time window - input: * save_all: boolean, False by default, the matrix of the concentration at every time step and the vector of time are saved in numpy arrays if True * path_save: string, './save/' by default, contains the path of the directory in which the ouputs are saved * display_flag: boolean, True by default, print the evolution in time of the solver if True - output: * c: numpy array of shape (msh.t_array.size, msh.y.size, msh.x.size), contains the concentration at all time steps """ # initialization of the unknown variable at the current time c = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],)) # initialization of the outputs array to be saved if save_all: # if the save directory does not exist, then it is created if not os.path.isdir(path_save): os.makedirs(path_save) c_save = np.zeros((self.msh.t_array.shape[0], self.msh.y.shape[0], self.msh.x.shape[0])) # loop until the final time or the steady state is reached for it, self.msh.t in enumerate(self.msh.t_array[1:]): if display_flag: sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s') sys.stdout.flush() # update the coefficients of the equation at the current time and self.at_current_time(self.msh.t, i=it) # store the concentration at the previous time step c_old = np.copy(c) # NECESSARY??? # inverse the linear system resulting the semi-implicit time discretization # using a conjugate gradient method for the current time step if self.time_discretization == 'semi-implicit with matrix inversion': c = self.inv_matrix_semi_implicit_scheme.dot(c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel())) info = 0 elif self.time_discretization == 'semi-implicit': c, info = cg( self.Id + self.msh.dt * (-self.D + self.R), c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()), x0=c_old, tol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step elif self.time_discretization == 'implicit': c, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A), c_old + self.msh.dt * self.S.value.ravel(), x0=c_old, tol=self.tol_inversion, ) elif self.time_discretization == 'implicit with preconditionning': precond = ( self.inv_matrix_implicit_scheme_t_init # self.jacobi # spsl.LinearOperator((self.msh.x.size*self.msh.y.size,self.msh.x.size*self.msh.y.size),matvec=self.spsolve_lu) ) c, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A), c_old + self.msh.dt * self.S.value.ravel(), x0=c_old, tol=self.tol_inversion, M=precond, ) elif self.time_discretization == 'implicit with matrix inversion': c = self.inv_matrix_implicit_scheme[it, :, :].dot(c_old + self.msh.dt * self.S.value.ravel()) info = 0 elif self.time_discretization == 'implicit with stationnary matrix inversion': RHS = c_old + self.msh.dt * self.S.value.ravel() LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A)).matvec(c_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) if flag_residu: c = np.dot(self.inv_matrix_implicit_scheme, RHS) info = 0 if info > 0: raise ValueError( "The algorithme used to solve the linear system has not converge" + "to the expected tolerance or within the maximum number of iteration." ) if info < 0: raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.") c_save[it + 1, :, :] = c.reshape((self.msh.y.shape[0], self.msh.x.shape[0])) # save the ouputs if save_all: if not os.path.isdir(Path(path_save)): os.makedirs(Path(path_save)) np.save(Path(path_save) / 'c.npy', c_save) return c_save
[docs] def solver_save_all(self, path_save='./save/', display_flag=True, save_rate=1000): """ solve the PDE on the whole time window - input: * save_all: boolean, False by default, the matrix of the concentration at every time step and the vector of time are saved in numpy arrays if True * path_save: string, './save/' by default, contains the path of the directory in which the ouputs are saved * display_flag: boolean, True by default, print the evolution in time of the solver if True - output: * c: numpy array of shape (msh.t_array.size, msh.y.size, msh.x.size), contains the concentration at all time steps """ # initialization of the unknown variable at the current time c = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],)) t_save = [] c_save = [] # np.zeros((, self.msh.y.shape[0] * self.msh.x.shape[0],)) # initialization of the outputs array to be saved # if the save directory does not exist, then it is created if not os.path.isdir(path_save): os.makedirs(path_save) # loop until the final time or the steady state is reached for it, self.msh.t in enumerate(self.msh.t_array[1:]): if display_flag: # print(f"\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s") sys.stdout.write(f'\n\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s') sys.stdout.flush() # update the coefficients of the equation at the current time and self.at_current_time(self.msh.t, i=it) # store the concentration at the previous time step c_old = np.copy(c) # NECESSARY??? # inverse the linear system resulting the semi-implicit time discretization # using a conjugate gradient method for the current time step if self.time_discretization == 'semi-implicit with matrix inversion': c = self.inv_matrix_semi_implicit_scheme.dot(c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel())) info = 0 elif self.time_discretization == 'semi-implicit': c, info = cg( self.Id + self.msh.dt * (-self.D + self.R), c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()), x0=c_old, tol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step elif self.time_discretization == 'implicit': c, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A), c_old + self.msh.dt * self.S.value.ravel(), x0=c_old, tol=self.tol_inversion, ) elif self.time_discretization == 'implicit with preconditionning': precond = ( self.inv_matrix_implicit_scheme_t_init # self.jacobi # spsl.LinearOperator((self.msh.x.size*self.msh.y.size,self.msh.x.size*self.msh.y.size),matvec=self.spsolve_lu) ) c, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A), c_old + self.msh.dt * self.S.value.ravel(), x0=c_old, tol=self.tol_inversion, M=precond, ) elif self.time_discretization == 'implicit with matrix inversion': c = self.inv_matrix_implicit_scheme[it, :, :].dot(c_old + self.msh.dt * self.S.value.ravel()) info = 0 elif self.time_discretization == 'implicit with stationnary matrix inversion': RHS = c_old + self.msh.dt * self.S.value.ravel() LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A)).matvec(c_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) if flag_residu: c = np.dot(self.inv_matrix_implicit_scheme, RHS) # c = np.dot(self.inv_matrix_implicit_scheme, c_old + self.msh.dt * self.S.value.ravel()) info = 0 if info > 0: raise ValueError( "The algorithme used to solve the linear system has not converge" + "to the expected tolerance or within the maximum number of iteration." ) if info < 0: raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.") if it % save_rate == 0: t_save.append(self.msh.t) c_save.append(c.reshape((self.msh.y.shape[0], self.msh.x.shape[0]))) np.save(Path(path_save) / 'c_save.npy', c_save) np.save(Path(path_save) / 't_save.npy', t_save)
[docs] def solver_est_at_obs_times(self, obs, display_flag=True): """ solve the PDE on the whole time window and store the resulting estimations of the concentration at the observations times - TO DO: * check that the function works - input: * obs: object of the class Obs, contains the observations and the observations times at which we aim to estimate the concentration - do: * solve the PDE on the whole time window but only store the resulting estimations at the observations times in the atrtibutes c_est of obs """ # initialization of the unknown variable at the current time c = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],)) if 0 in obs.index_obs_to_index_time_est: c_prov = c.reshape((self.msh.y.shape[0], self.msh.x.shape[0])) for index_obs in obs.index_time_est_to_index_obs[0]: index_x_est = np.argmin(np.abs(self.msh.x - obs.X_obs[index_obs, 0])) index_y_est = np.argmin(np.abs(self.msh.y - obs.X_obs[index_obs, 1])) obs.c_est[index_obs] = c_prov[index_y_est, index_x_est] # loop until the final time or the steady state is reached for it, self.msh.t in enumerate(self.msh.t_array[1:]): if display_flag: sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s') sys.stdout.flush() # update the coefficients of the equation at the current time and self.at_current_time(self.msh.t, i=it) # store the concentration at the previous time step c_old = np.copy(c) # NECESSARY??? # inverse the linear system resulting the semi-implicit time discretization # using a conjugate gradient method for the current time step if self.time_discretization == 'semi-implicit with matrix inversion': c = self.inv_matrix_semi_implicit_scheme.dot(c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel())) info = 0 elif self.time_discretization == 'semi-implicit': c, info = cg( self.Id + self.msh.dt * (-self.D + self.R), c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()), x0=c_old, tol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step elif self.time_discretization == 'implicit': c, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A), c_old + self.msh.dt * self.S.value.ravel(), x0=c_old, tol=self.tol_inversion, ) elif self.time_discretization == 'implicit with matrix inversion': c = self.inv_matrix_implicit_scheme[it, :, :].dot(c_old + self.msh.dt * self.S.value.ravel()) info = 0 elif self.time_discretization == 'implicit with stationnary matrix inversion': RHS = c_old + self.msh.dt * self.S.value.ravel() if not np.linalg.norm(RHS, ord=np.inf) < 1e-16: LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A)).matvec(c_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) # print(np.linalg.norm(RHS),np.linalg.norm(RHS, ord=np.inf)) if flag_residu: c = np.dot(self.inv_matrix_implicit_scheme, RHS) else: c = np.zeros_like(c) # print("the norm is 0") # c = np.dot(self.inv_matrix_implicit_scheme, c_old + self.msh.dt * self.S.value.ravel()) info = 0 if info > 0: raise ValueError( "The algorithme used to solve the linear system has not converge" + "to the expected tolerance or within the maximum number of iteration." ) if info < 0: raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.") it += 1 if it in obs.index_obs_to_index_time_est: c_prov = c.reshape((self.msh.y.shape[0], self.msh.x.shape[0])) for index_obs in obs.index_time_est_to_index_obs[it]: i = np.where(obs.index_obs_to_index_time_est[index_obs, :] == it) index_x_est = np.argmin(np.abs(self.msh.x - obs.X_obs[index_obs, 0])) index_y_est = np.argmin(np.abs(self.msh.y - obs.X_obs[index_obs, 1])) obs.c_est[index_obs, i] = c_prov[index_y_est, index_x_est]
[docs] def solver_steady_state(self): """ solve the steady-state version of the PDE inverse the linear system (- D + A + R) c = S using a conjugate gradient method - output: * numpy array of shape (msh.y.size, msh.x.size), resulting concentration at the steady state """ c_ravel, tol = cg(-self.D + self.A + self.R, self.S.value.ravel()) return c_ravel.reshape((self.msh.y.shape[0], self.msh.x.shape[0]))