import numpy as np
from scipy.sparse.linalg import LinearOperator as LinOp
from pheromone_dispersion.velocity import Velocity
"""
Module that contains the implementation of the linear operator of the advection term of the pheromone propagation model.
"""
[docs]
def sum_advection_flux_given_U(U, x, msh):
r"""
Compute at each cells the sum of the flux of an advection term
:math:`A:c\mapsto \nabla\cdot(\vec{u}c)~\forall (x,y)\in\Omega~\forall t\in]0;T]`
with the related boundary condition
:math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]`
for a given velocity field :math:`\vec{u}` at the current time.
Parameters
----------
U : ~pheromone_dispersion.velocity.Velocity
The wind field :math:`\vec{u}(x,y,t)`.
x : ~numpy.ndarray
The concentration of pheromones map :math:`c(x,y)` at the current time :math:`t` raveled into a vector.
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
Returns
-------
~numpy.ndarray
the sum of the flux at each cells at the current time.
"""
# To do:
# change the BC
# for now: no income
# change for: a given income
shift_vertical = np.zeros((msh.y_horizontal_interface.size, msh.x.size))
flux_vertical = np.zeros((msh.y_horizontal_interface.size, msh.x.size))
# Computation of the y-component of the flux
shift_vertical[:-1, :] = x[:, :] # temporary value on the boundary : boundary conditions are dealt with later
flux_vertical[U.cell_above_upwind] = U.at_horizontal_interface[:, :, 1][U.cell_above_upwind] * shift_vertical[U.cell_above_upwind]
shift_vertical[1:, :] = x[:, :]
shift_vertical[0, :] = 0 # temporary value on the boundary : boundary conditions are dealt with later
flux_vertical[U.cell_under_upwind] += U.at_horizontal_interface[:, :, 1][U.cell_under_upwind] * shift_vertical[U.cell_under_upwind]
# Boundary Condition on the y-component of the flux
flux_vertical[-1, U.cell_above_upwind[-1, :]] = 0
flux_vertical[0, U.cell_under_upwind[0, :]] = 0
shift_horizontal = np.zeros((msh.y.size, msh.x_vertical_interface.size))
flux_horizontal = np.zeros((msh.y.size, msh.x_vertical_interface.size))
# Computation of the x-component of the flux
shift_horizontal[:, :-1] = x[:, :] # temporary value on the boundary : boundary conditions are dealt with later
flux_horizontal[U.cell_right_upwind] = (
U.at_vertical_interface[:, :, 0][U.cell_right_upwind] * shift_horizontal[U.cell_right_upwind]
) # transport
shift_horizontal[:, 1:] = x[:, :]
shift_horizontal[:, 0] = 0 # temporary value on the boundary : boundary conditions are dealt with later
flux_horizontal[U.cell_left_upwind] += (
U.at_vertical_interface[:, :, 0][U.cell_left_upwind] * shift_horizontal[U.cell_left_upwind]
) # transport
# Boundary Condition on the x-component of the flux
flux_horizontal[U.cell_right_upwind[:, -1], -1] = 0
flux_horizontal[U.cell_left_upwind[:, 0], 0] = 0
# Computation of the flux over the border of the cell for a cartesian mesh
sum_faces = (
msh.dy * (flux_horizontal[:, 1:] - flux_horizontal[:, :-1]) + msh.dx * (flux_vertical[1:, :] - flux_vertical[:-1, :])
) / msh.mass_cell
return sum_faces
[docs]
class Advection(LinOp):
r"""
Class containing the advection term linear operator :math:`A:c\mapsto \nabla\cdot(\vec{u}c)~\forall (x,y)\in\Omega~\forall t\in]0;T]`
with the related boundary condition
:math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]`.
The implementation of this linear operator is a subclass of the :py:class:`~scipy.sparse.linalg.LinearOperator` class.
Attributes
----------
U : ~pheromone_dispersion.velocity.Velocity
The wind field :math:`\vec{u}(x,y,t)`.
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
minus_U : ~pheromone_dispersion.velocity.Velocity
The opposite sign wind field :math:`-\vec{u}(x,y,t)`.
shape : tuple of int
Shape of the matrix of the linear operator.
The shape is (:attr:`msh.y.size` * :attr:`msh.x.size`, :attr:`msh.y.size` * :attr:`msh.x.size`)
dtype : ~numpy.dtype
Data type of the elements of the matrix of the linear operator.
The type is `float64`
"""
[docs]
def __init__(self, U, msh):
r"""
Constructor method
Parameters
----------
U : ~pheromone_dispersion.velocity.Velocity
The wind field :math:`\vec{u}(x,y,t)`.
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
"""
self.U = U
if U.t is None:
self.minus_U = Velocity(msh, -U.at_vertical_interface, -U.at_horizontal_interface)
else:
minus_U_at_vertical_interface = np.zeros((U.t.size, U.at_vertical_interface.shape[0], U.at_vertical_interface.shape[1], 2))
minus_U_at_horizontal_interface = np.zeros(
(U.t.size, U.at_horizontal_interface.shape[0], U.at_horizontal_interface.shape[1], 2)
)
for it, tc in enumerate(U.t):
U.at_current_time(tc)
minus_U_at_vertical_interface[it, :, :, :] = -U.at_vertical_interface
minus_U_at_horizontal_interface[it, :, :, :] = -U.at_horizontal_interface
self.minus_U = Velocity(msh, minus_U_at_vertical_interface, minus_U_at_horizontal_interface, t=U.t)
U.at_current_time(U.t[0])
self.msh = msh
self.shape = (self.msh.y.size * self.msh.x.size, self.msh.y.size * self.msh.x.size)
self.dtype = np.dtype(float)
[docs]
def at_current_time(self, tc):
r"""
Update the attributes :attr:`U` and :attr:`minus_U` at a given time.
Parameters
----------
tc : float or integer
The current time.
Notes
-----
Updates the wind fields :attr:`U` and :attr:`minus_U` and their own attributes
using the method :meth:`~pheromone_dispersion.velocity.Velocity.at_current_time`
of the class :py:class:`~pheromone_dispersion.velocity.Velocity`.
"""
self.U.at_current_time(tc)
self.minus_U.at_current_time(tc)
[docs]
def _matvec(self, x_out):
r"""
Compute the image (matrix-vector product) of :math:`A:c\mapsto \nabla\cdot(\vec{u}c)`
with the related boundary condition
for a given concentration map :math:`c` at the current time.
Parameters
----------
x_out : ~numpy.ndarray
The map of concentration of pheromones :math:`c(x,y)` at a given time :math:`t` raveled into a vector.
Returns
-------
~numpy.ndarray
Array containing the image :math:`\nabla\cdot(\vec{u}c)`.
Notes
-----
The input :math:`c` has to be raveled into a (:attr:`msh.y.size` * :attr:`msh.x.size`,)-shape array
to match the format of the :py:class:`~scipy.sparse.linalg.LinearOperator` class.
The same way, the ouput can be reshape into a (:attr:`msh.y.size`, :attr:`msh.x.size`)-shape array
to get the map :math:`Ac(x,y)` at the current time :math:`t`
"""
# reshape the vector (of (msh.x.size*msh.y.size, )) containg the concentration into a matrix (of shape (msh.y.size, msh.x.size))
x = x_out.reshape((self.msh.y.size, self.msh.x.size)) # unknown of the PDE
sum_faces = sum_advection_flux_given_U(self.U, x, self.msh)
return sum_faces.reshape((self.msh.y.size * self.msh.x.size,))
[docs]
def _rmatvec(self, x_out):
r"""
Compute the image (matrix-vector product) of :math:`A^*:c^*\mapsto - \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*`
with the related boundary condition
for the map of a given element of the adjoint state space at the current time.
Parameters
----------
x_out : ~numpy.ndarray
The map of an element of the adjoint state space :math:`c^*(x,y)` at a given time :math:`t` raveled into a vector.
Returns
-------
~numpy.ndarray
Array containing the image :math:`- \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*`.
Notes
-----
The input :math:`c^*` has to be raveled into a (:attr:`msh.y.size` * :attr:`msh.x.size`,)-shape array
to match the format of the :py:class:`~scipy.sparse.linalg.LinearOperator` class.
The same way, the ouput can be reshape into a (:attr:`msh.y.size`, :attr:`msh.x.size`)-shape array
to get the map :math:`A^*c^*(x,y)` at the current time :math:`t`.
"""
# reshape the vector (of (msh.x.size*msh.y.size, )) containg the concentration into a matrix (of shape (msh.y.size, msh.x.size))
x = x_out.reshape((self.msh.y.size, self.msh.x.size)) # unknown of the PDE
sum_faces = sum_advection_flux_given_U(self.minus_U, x, self.msh) + self.U.div * x
return sum_faces.reshape((self.msh.y.size * self.msh.x.size,))
def _matmat(self, x_out):
# Compute the image (matrix-matrix product)
# using a naive method based on the _matvec method
return np.hstack([self.matvec(col.reshape(-1, 1)) for col in x_out.T])