Source code for source_localization.obs

import numpy as np
import scipy.sparse as sps

"""
Module that contains all the features related to the observations and the sensors
"""


[docs] class Obs: r""" Class containing all the features related to the sensors and the observations. This includes: - the observations :math:`m_{obs}`, the observation times :math:`t_{obs}` and the observation positions :math:`X_{obs}=(x_{obs}, y_{obs})`, - the estimation of the observed variable :math:`m(c(s))`, - the concentration of pheromone :math:`c(s)` at the time and positions required to compute the observed variable, - the observation operator :math:`c\mapsto m(c)` - the adjoint operator of its derivative with respect of the state variable :math:`\phi\mapsto (d_cm(c))^*\cdot\phi`, - the covariance matrix of the observation error :math:`\mathbf{R}`. In this class, the observations are the accumulation of pheromone over a given time window. Therefore, the observation operator is the integral of the pheromone concentration over this time window: .. math:: m(c)_{i}=\int_{t_{obs, i}-\delta t}^{t_{obs, i}}c(x_{obs, i}, y_{obs, i}, \tau)d\tau Attributes ---------- msh : ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. t_obs : ~numpy.ndarray The observation times :math:`t_{obs}`. X_obs : ~numpy.ndarray The locations of the observations :math:`X_{obs}=(x_{obs}, y_{obs})`. d_obs : ~numpy.ndarray The observations data :math:`m_{obs}`. N_obs : int The number of observations. nb_sensors : int The number of sensors. X_sensors : ~numpy.ndarray The positions of the sensors. dt_obs_operator : float Length of the time window of accumulation :math:`\delta t`. If the observation is instantaneous, set to `0`. nb_dt_in_obs_time_window : int Number of time steps covering the time window of accumulation :math:`[t_{obs, i}-\delta t; t_{obs, i}]`. index_obs_to_index_time_est : ~numpy.ndarray The indexes of the times at which the state variable is required to compute the observation variable given the index of an observation. index_time_est_to_index_obs : dict Dictionary where each key is a time index, and the value is an array of indexes of the observations that require the estimate of the state variable at this time to estimate the observed variable. c_est : ~numpy.ndarray The estimation the concentration of pheromone :math:`c(s;x,y,t)` at the times :math:`t` and positions :math:`(x,y)` required to compute the observed variable :math:`m(c(s))`. d_est : ~numpy.ndarray The estimation of the accumulation of pheromone (observed variable) computed using the pheromone propagation model :math:`m(c(s))`. R_inv : :mod:`~scipy.sparse` matrix, default: :func:`~scipy.sparse.identity` matrix The inverse of the covariance matrix of the observation error :math:`\mathbf{R}^{-1}`. """
[docs] def __init__(self, t_obs, X_obs, d_obs, msh, dt_obs_operator=0.0): r""" Constructor method. Parameters ---------- t_obs : ~numpy.ndarray The observation times :math:`t_{obs}`. X_obs : ~numpy.ndarray The locations of the observations :math:`X_{obs}=(x_{obs}, y_{obs})`. d_obs : ~numpy.ndarray The observations data :math:`m_{obs}`. msh : ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. dt_obs_operator : float, default: 0 Length of the time window of accumulation :math:`\delta t`. If set to `0`, the observation is instantaneous. Raises ------ ValueError if the time array has not been initialized using the methods :func:`~pheromone_dispersion.geom.MeshRect2D.calc_dt_explicit_solver` or :func:`~pheromone_dispersion.geom.MeshRect2D.calc_dt_implicit_solver`. ValueError if the sizes of the arrays containing the observation times :math:`t_{obs}`, the observation positions :math:`X_{obs}` and the observations data :math:`m_{obs}` do not coincide. ValueError if one observation time :math:`t_{obs}` or one observation position :math:`X_{obs}` is out of the time window :math:`[0;T]` or domain :math:`\Omega`. """ # To do: # update the documentation and comments for the new observation operator # add an exception to make sure that the accumulation time window does not overlapp for a given sensor # add an exception to make sure that no observations are made after self.msh = msh self.t_obs = np.copy(t_obs) self.X_obs = np.copy(X_obs) self.d_obs = np.copy(d_obs) self.N_obs = np.size(d_obs) if self.msh.t_array is None: raise ValueError( "The array containing the time at which the direct model is solved, has not been correctly initialized." + " This is probably because the time step has not been computed so the CFL condition is satisfied." + " The method calc_CFL of the class MeshRect2D should then be executed first." ) if d_obs.ndim != 1: raise ValueError("The array of the observations should have just one dimension, it should not be a matrix.") if t_obs.shape != (self.N_obs,): raise ValueError( "The number of observations and the number of observations time are not the same." + " Either the vector of observations or the vector of time are incomplete." ) if np.max(t_obs) > np.max(self.msh.t_array) or np.min(t_obs) < np.min(self.msh.t_array): raise ValueError( "At least one of the observations is made out of the temporal domain." + " Make sure that the observation times are inside the modelling time window." ) if X_obs.shape != (self.N_obs, 2): raise ValueError( " The number of observations and the number of location of observations are not the same." + " Either the vector of observations or the vector of location are incomplete." ) if ( np.max(self.X_obs[:, 0]) > np.max(self.msh.x_vertical_interface) or np.min(self.X_obs[:, 0]) < np.min(self.msh.x_vertical_interface) or np.max(self.X_obs[:, 1]) > np.max(self.msh.y_horizontal_interface) or np.min(self.X_obs[:, 1]) < np.min(self.msh.y_horizontal_interface) ): raise ValueError( "At least one of the observations is made out of the spatial domain." + " Make sure that the observation location are inside the domain." ) # initialization of the vector containing the position of the sensors self.X_sensors = np.unique(self.X_obs, axis=0) self.nb_sensors = np.shape(self.X_sensors)[0] self.dt_obs_operator = dt_obs_operator self.nb_dt_in_obs_time_window = int((self.dt_obs_operator // self.msh.dt) + 1) # self.nb_dt_in_obs_time_window = int(np.ceil(dt_obs_operator / msh.dt)) # Exception to make sure that the accumulation time windows do not overlapp for a given sensor for X_sensor in self.X_sensors: indexes_sensor = [x == X_sensor[0] and y == X_sensor[1] for x, y in zip(self.X_obs[:, 0], self.X_obs[:, 1])] # indexes_sensor = np.where(self.X_obs==X_sensor) ts_obs_sensor = self.t_obs[indexes_sensor] for t_obs_sensor in ts_obs_sensor: if (np.abs(ts_obs_sensor[ts_obs_sensor != t_obs_sensor] - t_obs_sensor) < self.dt_obs_operator).any(): raise ValueError( "The time windows of integration for two different data of a same sensors are overlapping." + "The implemented observation operator supposes that these time windows are not overlapping" ) self.c_est = np.full((self.N_obs, self.nb_dt_in_obs_time_window), np.nan) self.d_est = np.full((self.N_obs,), np.nan) self.R_inv = sps.identity(self.N_obs, dtype=int) # initialization of the vector containing the time index given the index of an observation self.index_obs_to_index_time_est = np.full((self.N_obs, self.nb_dt_in_obs_time_window), np.nan) self.index_obs_to_index_time_est[:, 0] = np.argmin( np.abs(self.t_obs.reshape((self.N_obs, 1)) - msh.t_array.reshape((1, msh.t_array.size))), axis=1 ) for i in range(1, self.nb_dt_in_obs_time_window): if (self.index_obs_to_index_time_est[:, i - 1] <= 0).any(): raise ValueError( "The time window of integration of the observation operator for the first observation" + " starts before the temporal domain." + " This is probably because the integration time window is too large or the observation times are inaccurate." ) self.index_obs_to_index_time_est[:, i] = self.index_obs_to_index_time_est[:, i - 1] - 1 # initialization of the dictionnary containing the indexes of the observations given the time index self.index_time_est_to_index_obs = {} for i in range(self.nb_dt_in_obs_time_window): for increment, time_idx in enumerate(self.index_obs_to_index_time_est[:, i]): if time_idx in self.index_time_est_to_index_obs.keys(): self.index_time_est_to_index_obs[time_idx].append(increment) else: self.index_time_est_to_index_obs[time_idx] = [increment]
[docs] def obs_operator(self): r""" The observation operator :math:`c\mapsto m(c)`. In this class, the observation operator is, for the :math:`i^{th}` observation time and position, :math:`m(c)_{i}=\int_{t_{obs, i}-\delta t}^{t_{obs, i}}c(x_{obs, i}, y_{obs, i}, \tau)d\tau`. Moreover, it is assumed that the integration time window for different data of a same sensor are not overlapping Notes ----- This method computes the observed variable and store the values in the attribute :attr:`d_est` using the attribute :attr:`c_est` that contains the estimation the concentration of pheromone :math:`c(s;x,y,t)` computed by the pheromone propagation model at the times :math:`t` and positions :math:`(x,y)` required to compute the observed variable. To get these :math:`c(s;x,y,t)`, the attribute :attr:`c_est` should be computed in a first time using the method :meth:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation.solver_est_at_obs_times` of the class :meth:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation`. Raises ------ ValueError if the attribute :attr:`c_est` has not been computed in a first time. """ if np.array([np.isnan(self.c_est[i]) for i in range(self.N_obs)]).any(): raise ValueError( "The vector of estimation of the state variable at the observations time and location has not been computed." + " The method solver_est_at_obs_times should be used to computed this vector." ) self.d_est = np.sum(self.c_est, axis=1) * self.msh.dt
[docs] def onesensor_adjoint_derivative_obs_operator(self, t, phi, index_sensor): r""" compute the spatial map of the adjoint of the derivative of the one-sensor observation operator with respect to the state variable :math:`\phi\mapsto (d_cm_i(c))^*\cdot\phi`, at a given time :math:`t`, for the :math:`i^{th}` sensor and for an element of the observation space :math:`\phi`. Parameters ---------- t : float or integer The current time :math:`t`. phi : ~numpy.ndarray The element of the observation space :math:`\phi` at current time :math:`t`. index_sensor: int The index of the sensor :math:`i`. Returns ------- ~numpy.ndarray The spatial map of the image of the adjoint of the derivative of the one-sensor observation operator :math:`(d_cm_i(c))^*\cdot\phi(x,y)` at the given time raveled into a vector. """ index_t = np.argmin(np.abs(t - self.msh.t_array)) out = np.zeros((self.msh.y.size, self.msh.x.size)) X_sensor = self.X_sensors[index_sensor] if index_t in self.index_obs_to_index_time_est: index_obs_current_t = self.index_time_est_to_index_obs[index_t] X_obs_current_t = self.X_obs[index_obs_current_t] if X_sensor in X_obs_current_t: index = np.argmin(np.abs(X_obs_current_t[:, 0] - X_sensor[0]) + np.abs(X_obs_current_t[:, 1] - X_sensor[1])) index_obs = index_obs_current_t[index] index_x_est = np.argmin(np.abs(self.msh.x - X_sensor[0])) index_y_est = np.argmin(np.abs(self.msh.y - X_sensor[1])) out[index_y_est, index_x_est] = phi[index_obs] * self.msh.dt return out.reshape((self.msh.y.size * self.msh.x.size,))
[docs] def adjoint_derivative_obs_operator(self, t, phi): r""" compute the spatial map of the adjoint of the derivative of the observation operator with respect to the state variable :math:`\phi\mapsto (d_cm(c))^*\cdot\phi`, at a given time :math:`t` and for an element of the observation space :math:`\phi`. Parameters ---------- t : float or integer The current time :math:`t`. phi : ~numpy.ndarray The element of the observation space :math:`\phi` at current time :math:`t`. Returns ------- ~numpy.ndarray The spatial map of the image of the adjoint of the derivative of the observation operator :math:`(d_cm(c))^*\cdot\phi(x,y)` at the given time raveled into a vector. """ index_t = np.argmin(np.abs(t - self.msh.t_array)) out = np.zeros((self.msh.y.size, self.msh.x.size)) # update only if an observation is made at the current time # if no observation is made at the current time, the attribute index_x_est is set to [] if index_t in self.index_obs_to_index_time_est: # initialization of index_y_est index_x_est = [] index_y_est = [] # extracting the indexes of the observations made at the current time index_obs = self.index_time_est_to_index_obs[index_t] # looping on the observations made at the current time and storing the indexes of the coordinates of the estimate points for i in index_obs: index_x_est.append(np.argmin(np.abs(self.msh.x - self.X_obs[i, 0]))) index_y_est.append(np.argmin(np.abs(self.msh.y - self.X_obs[i, 1]))) # for each observation points, store the associated input at the correponding estimate point for i_x, i_y, i_obs in zip(index_x_est, index_y_est, index_obs): out[i_y, i_x] = phi[i_obs] * self.msh.dt return out.reshape((self.msh.y.size * self.msh.x.size,))