import numpy as np
import scipy.sparse as sps
"""
Module that contains all the features related to the observations and the sensors
"""
[docs]
class Obs:
r"""
Class containing all the features related to the sensors and the observations.
This includes:
- the observations :math:`m_{obs}`,
the observation times :math:`t_{obs}` and
the observation positions :math:`X_{obs}=(x_{obs}, y_{obs})`,
- the estimation of the observed variable :math:`m(c(s))`,
- the concentration of pheromone :math:`c(s)` at the time and positions required to compute the observed variable,
- the observation operator :math:`c\mapsto m(c)`
- the adjoint operator of its derivative with respect of the state variable :math:`\phi\mapsto (d_cm(c))^*\cdot\phi`,
- the covariance matrix of the observation error :math:`\mathbf{R}`.
In this class, the observations are the accumulation of pheromone over a given time window.
Therefore, the observation operator is
the integral of the pheromone concentration over this time window:
.. math::
m(c)_{i}=\int_{t_{obs, i}-\delta t}^{t_{obs, i}}c(x_{obs, i}, y_{obs, i}, \tau)d\tau
Attributes
----------
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
t_obs : ~numpy.ndarray
The observation times :math:`t_{obs}`.
X_obs : ~numpy.ndarray
The locations of the observations :math:`X_{obs}=(x_{obs}, y_{obs})`.
d_obs : ~numpy.ndarray
The observations data :math:`m_{obs}`.
N_obs : int
The number of observations.
nb_sensors : int
The number of sensors.
X_sensors : ~numpy.ndarray
The positions of the sensors.
dt_obs_operator : float
Length of the time window of accumulation :math:`\delta t`.
If the observation is instantaneous, set to `0`.
nb_dt_in_obs_time_window : int
Number of time steps covering the time window of accumulation :math:`[t_{obs, i}-\delta t; t_{obs, i}]`.
index_obs_to_index_time_est : ~numpy.ndarray
The indexes of the times
at which the state variable is required to compute the observation variable given the index of an observation.
index_time_est_to_index_obs : dict
Dictionary where each key is a time index, and the value is an array of indexes of the observations that
require the estimate of the state variable at this time to estimate the observed variable.
c_est : ~numpy.ndarray
The estimation the concentration of pheromone :math:`c(s;x,y,t)`
at the times :math:`t` and positions :math:`(x,y)`
required to compute the observed variable :math:`m(c(s))`.
d_est : ~numpy.ndarray
The estimation of the accumulation of pheromone (observed variable)
computed using the pheromone propagation model :math:`m(c(s))`.
R_inv : :mod:`~scipy.sparse` matrix, default: :func:`~scipy.sparse.identity` matrix
The inverse of the covariance matrix of the observation error :math:`\mathbf{R}^{-1}`.
"""
[docs]
def __init__(self, t_obs, X_obs, d_obs, msh, dt_obs_operator=0.0):
r"""
Constructor method.
Parameters
----------
t_obs : ~numpy.ndarray
The observation times :math:`t_{obs}`.
X_obs : ~numpy.ndarray
The locations of the observations :math:`X_{obs}=(x_{obs}, y_{obs})`.
d_obs : ~numpy.ndarray
The observations data :math:`m_{obs}`.
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
dt_obs_operator : float, default: 0
Length of the time window of accumulation :math:`\delta t`.
If set to `0`, the observation is instantaneous.
Raises
------
ValueError
if the time array has not been initialized using
the methods :func:`~pheromone_dispersion.geom.MeshRect2D.calc_dt_explicit_solver`
or :func:`~pheromone_dispersion.geom.MeshRect2D.calc_dt_implicit_solver`.
ValueError
if the sizes of
the arrays containing the observation times :math:`t_{obs}`,
the observation positions :math:`X_{obs}` and
the observations data :math:`m_{obs}`
do not coincide.
ValueError
if one observation time :math:`t_{obs}`
or one observation position :math:`X_{obs}`
is out of the time window :math:`[0;T]` or domain :math:`\Omega`.
"""
# To do:
# update the documentation and comments for the new observation operator
# add an exception to make sure that the accumulation time window does not overlapp for a given sensor
# add an exception to make sure that no observations are made after
self.msh = msh
self.t_obs = np.copy(t_obs)
self.X_obs = np.copy(X_obs)
self.d_obs = np.copy(d_obs)
self.N_obs = np.size(d_obs)
if self.msh.t_array is None:
raise ValueError(
"The array containing the time at which the direct model is solved, has not been correctly initialized."
+ " This is probably because the time step has not been computed so the CFL condition is satisfied."
+ " The method calc_CFL of the class MeshRect2D should then be executed first."
)
if d_obs.ndim != 1:
raise ValueError("The array of the observations should have just one dimension, it should not be a matrix.")
if t_obs.shape != (self.N_obs,):
raise ValueError(
"The number of observations and the number of observations time are not the same."
+ " Either the vector of observations or the vector of time are incomplete."
)
if np.max(t_obs) > np.max(self.msh.t_array) or np.min(t_obs) < np.min(self.msh.t_array):
raise ValueError(
"At least one of the observations is made out of the temporal domain."
+ " Make sure that the observation times are inside the modelling time window."
)
if X_obs.shape != (self.N_obs, 2):
raise ValueError(
" The number of observations and the number of location of observations are not the same."
+ " Either the vector of observations or the vector of location are incomplete."
)
if (
np.max(self.X_obs[:, 0]) > np.max(self.msh.x_vertical_interface)
or np.min(self.X_obs[:, 0]) < np.min(self.msh.x_vertical_interface)
or np.max(self.X_obs[:, 1]) > np.max(self.msh.y_horizontal_interface)
or np.min(self.X_obs[:, 1]) < np.min(self.msh.y_horizontal_interface)
):
raise ValueError(
"At least one of the observations is made out of the spatial domain."
+ " Make sure that the observation location are inside the domain."
)
# initialization of the vector containing the position of the sensors
self.X_sensors = np.unique(self.X_obs, axis=0)
self.nb_sensors = np.shape(self.X_sensors)[0]
self.dt_obs_operator = dt_obs_operator
self.nb_dt_in_obs_time_window = int((self.dt_obs_operator // self.msh.dt) + 1)
# self.nb_dt_in_obs_time_window = int(np.ceil(dt_obs_operator / msh.dt))
# Exception to make sure that the accumulation time windows do not overlapp for a given sensor
for X_sensor in self.X_sensors:
indexes_sensor = [x == X_sensor[0] and y == X_sensor[1] for x, y in zip(self.X_obs[:, 0], self.X_obs[:, 1])]
# indexes_sensor = np.where(self.X_obs==X_sensor)
ts_obs_sensor = self.t_obs[indexes_sensor]
for t_obs_sensor in ts_obs_sensor:
if (np.abs(ts_obs_sensor[ts_obs_sensor != t_obs_sensor] - t_obs_sensor) < self.dt_obs_operator).any():
raise ValueError(
"The time windows of integration for two different data of a same sensors are overlapping."
+ "The implemented observation operator supposes that these time windows are not overlapping"
)
self.c_est = np.full((self.N_obs, self.nb_dt_in_obs_time_window), np.nan)
self.d_est = np.full((self.N_obs,), np.nan)
self.R_inv = sps.identity(self.N_obs, dtype=int)
# initialization of the vector containing the time index given the index of an observation
self.index_obs_to_index_time_est = np.full((self.N_obs, self.nb_dt_in_obs_time_window), np.nan)
self.index_obs_to_index_time_est[:, 0] = np.argmin(
np.abs(self.t_obs.reshape((self.N_obs, 1)) - msh.t_array.reshape((1, msh.t_array.size))), axis=1
)
for i in range(1, self.nb_dt_in_obs_time_window):
if (self.index_obs_to_index_time_est[:, i - 1] <= 0).any():
raise ValueError(
"The time window of integration of the observation operator for the first observation"
+ " starts before the temporal domain."
+ " This is probably because the integration time window is too large or the observation times are inaccurate."
)
self.index_obs_to_index_time_est[:, i] = self.index_obs_to_index_time_est[:, i - 1] - 1
# initialization of the dictionnary containing the indexes of the observations given the time index
self.index_time_est_to_index_obs = {}
for i in range(self.nb_dt_in_obs_time_window):
for increment, time_idx in enumerate(self.index_obs_to_index_time_est[:, i]):
if time_idx in self.index_time_est_to_index_obs.keys():
self.index_time_est_to_index_obs[time_idx].append(increment)
else:
self.index_time_est_to_index_obs[time_idx] = [increment]
[docs]
def obs_operator(self):
r"""
The observation operator :math:`c\mapsto m(c)`.
In this class, the observation operator is,
for the :math:`i^{th}` observation time and position,
:math:`m(c)_{i}=\int_{t_{obs, i}-\delta t}^{t_{obs, i}}c(x_{obs, i}, y_{obs, i}, \tau)d\tau`.
Moreover, it is assumed that the integration time window for different data of a same sensor are not overlapping
Notes
-----
This method computes the observed variable and
store the values in the attribute :attr:`d_est`
using the attribute :attr:`c_est`
that contains the estimation the concentration of pheromone :math:`c(s;x,y,t)`
computed by the pheromone propagation model
at the times :math:`t` and positions :math:`(x,y)` required to compute the observed variable.
To get these :math:`c(s;x,y,t)`,
the attribute :attr:`c_est` should be computed in a first time
using the method :meth:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation.solver_est_at_obs_times`
of the class :meth:`~pheromone_dispersion.convection_diffusion_2D.DiffusionConvectionReaction2DEquation`.
Raises
------
ValueError
if the attribute :attr:`c_est` has not been computed in a first time.
"""
if np.array([np.isnan(self.c_est[i]) for i in range(self.N_obs)]).any():
raise ValueError(
"The vector of estimation of the state variable at the observations time and location has not been computed."
+ " The method solver_est_at_obs_times should be used to computed this vector."
)
self.d_est = np.sum(self.c_est, axis=1) * self.msh.dt
[docs]
def onesensor_adjoint_derivative_obs_operator(self, t, phi, index_sensor):
r"""
compute the spatial map
of the adjoint of the derivative of the one-sensor observation operator
with respect to the state variable
:math:`\phi\mapsto (d_cm_i(c))^*\cdot\phi`,
at a given time :math:`t`,
for the :math:`i^{th}` sensor and
for an element of the observation space :math:`\phi`.
Parameters
----------
t : float or integer
The current time :math:`t`.
phi : ~numpy.ndarray
The element of the observation space :math:`\phi` at current time :math:`t`.
index_sensor: int
The index of the sensor :math:`i`.
Returns
-------
~numpy.ndarray
The spatial map of the image
of the adjoint of the derivative of the one-sensor observation operator
:math:`(d_cm_i(c))^*\cdot\phi(x,y)`
at the given time
raveled into a vector.
"""
index_t = np.argmin(np.abs(t - self.msh.t_array))
out = np.zeros((self.msh.y.size, self.msh.x.size))
X_sensor = self.X_sensors[index_sensor]
if index_t in self.index_obs_to_index_time_est:
index_obs_current_t = self.index_time_est_to_index_obs[index_t]
X_obs_current_t = self.X_obs[index_obs_current_t]
if X_sensor in X_obs_current_t:
index = np.argmin(np.abs(X_obs_current_t[:, 0] - X_sensor[0]) + np.abs(X_obs_current_t[:, 1] - X_sensor[1]))
index_obs = index_obs_current_t[index]
index_x_est = np.argmin(np.abs(self.msh.x - X_sensor[0]))
index_y_est = np.argmin(np.abs(self.msh.y - X_sensor[1]))
out[index_y_est, index_x_est] = phi[index_obs] * self.msh.dt
return out.reshape((self.msh.y.size * self.msh.x.size,))
[docs]
def adjoint_derivative_obs_operator(self, t, phi):
r"""
compute the spatial map of
the adjoint of the derivative of the observation operator with respect to the state variable
:math:`\phi\mapsto (d_cm(c))^*\cdot\phi`,
at a given time :math:`t`
and for an element of the observation space :math:`\phi`.
Parameters
----------
t : float or integer
The current time :math:`t`.
phi : ~numpy.ndarray
The element of the observation space :math:`\phi` at current time :math:`t`.
Returns
-------
~numpy.ndarray
The spatial map of the image of the adjoint of the derivative of the observation operator
:math:`(d_cm(c))^*\cdot\phi(x,y)`
at the given time
raveled into a vector.
"""
index_t = np.argmin(np.abs(t - self.msh.t_array))
out = np.zeros((self.msh.y.size, self.msh.x.size))
# update only if an observation is made at the current time
# if no observation is made at the current time, the attribute index_x_est is set to []
if index_t in self.index_obs_to_index_time_est:
# initialization of index_y_est
index_x_est = []
index_y_est = []
# extracting the indexes of the observations made at the current time
index_obs = self.index_time_est_to_index_obs[index_t]
# looping on the observations made at the current time and storing the indexes of the coordinates of the estimate points
for i in index_obs:
index_x_est.append(np.argmin(np.abs(self.msh.x - self.X_obs[i, 0])))
index_y_est.append(np.argmin(np.abs(self.msh.y - self.X_obs[i, 1])))
# for each observation points, store the associated input at the correponding estimate point
for i_x, i_y, i_obs in zip(index_x_est, index_y_est, index_obs):
out[i_y, i_x] = phi[i_obs] * self.msh.dt
return out.reshape((self.msh.y.size * self.msh.x.size,))