Source code for pheromone_dispersion.advection_operator

import numpy as np
from scipy.sparse.linalg import LinearOperator as LinOp

from pheromone_dispersion.velocity import Velocity

"""
Module that contains the implementation of the linear operator of the advection term of the pheromone propagation model.
"""


[docs] def sum_advection_flux_given_U(U, x, msh): r""" Compute at each cells the sum of the flux of an advection term :math:`A:c\mapsto \nabla\cdot(\vec{u}c)~\forall (x,y)\in\Omega~\forall t\in]0;T]` with the related boundary condition :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]` for a given velocity field :math:`\vec{u}` at the current time. Parameters ---------- U : ~pheromone_dispersion.velocity.Velocity The wind field :math:`\vec{u}(x,y,t)`. x : ~numpy.ndarray The concentration of pheromones map :math:`c(x,y)` at the current time :math:`t` raveled into a vector. msh : ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. Returns ------- ~numpy.ndarray the sum of the flux at each cells at the current time. """ # To do: # change the BC # for now: no income # change for: a given income shift_vertical = np.zeros((msh.y_horizontal_interface.size, msh.x.size)) flux_vertical = np.zeros((msh.y_horizontal_interface.size, msh.x.size)) # Computation of the y-component of the flux shift_vertical[:-1, :] = x[:, :] # temporary value on the boundary : boundary conditions are dealt with later flux_vertical[U.cell_above_upwind] = U.at_horizontal_interface[:, :, 1][U.cell_above_upwind] * shift_vertical[U.cell_above_upwind] shift_vertical[1:, :] = x[:, :] shift_vertical[0, :] = 0 # temporary value on the boundary : boundary conditions are dealt with later flux_vertical[U.cell_under_upwind] += U.at_horizontal_interface[:, :, 1][U.cell_under_upwind] * shift_vertical[U.cell_under_upwind] # Boundary Condition on the y-component of the flux flux_vertical[-1, U.cell_above_upwind[-1, :]] = 0 flux_vertical[0, U.cell_under_upwind[0, :]] = 0 shift_horizontal = np.zeros((msh.y.size, msh.x_vertical_interface.size)) flux_horizontal = np.zeros((msh.y.size, msh.x_vertical_interface.size)) # Computation of the x-component of the flux shift_horizontal[:, :-1] = x[:, :] # temporary value on the boundary : boundary conditions are dealt with later flux_horizontal[U.cell_right_upwind] = ( U.at_vertical_interface[:, :, 0][U.cell_right_upwind] * shift_horizontal[U.cell_right_upwind] ) # transport shift_horizontal[:, 1:] = x[:, :] shift_horizontal[:, 0] = 0 # temporary value on the boundary : boundary conditions are dealt with later flux_horizontal[U.cell_left_upwind] += ( U.at_vertical_interface[:, :, 0][U.cell_left_upwind] * shift_horizontal[U.cell_left_upwind] ) # transport # Boundary Condition on the x-component of the flux flux_horizontal[U.cell_right_upwind[:, -1], -1] = 0 flux_horizontal[U.cell_left_upwind[:, 0], 0] = 0 # Computation of the flux over the border of the cell for a cartesian mesh sum_faces = ( msh.dy * (flux_horizontal[:, 1:] - flux_horizontal[:, :-1]) + msh.dx * (flux_vertical[1:, :] - flux_vertical[:-1, :]) ) / msh.mass_cell return sum_faces
[docs] class Advection(LinOp): r""" Class containing the advection term linear operator :math:`A:c\mapsto \nabla\cdot(\vec{u}c)~\forall (x,y)\in\Omega~\forall t\in]0;T]` with the related boundary condition :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]`. The implementation of this linear operator is a subclass of the :py:class:`~scipy.sparse.linalg.LinearOperator` class. Attributes ---------- U : ~pheromone_dispersion.velocity.Velocity The wind field :math:`\vec{u}(x,y,t)`. msh : ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. minus_U : ~pheromone_dispersion.velocity.Velocity The opposite sign wind field :math:`-\vec{u}(x,y,t)`. shape : tuple of int Shape of the matrix of the linear operator. The shape is (:attr:`msh.y.size` * :attr:`msh.x.size`, :attr:`msh.y.size` * :attr:`msh.x.size`) dtype : ~numpy.dtype Data type of the elements of the matrix of the linear operator. The type is `float64` """
[docs] def __init__(self, U, msh): r""" Constructor method Parameters ---------- U : ~pheromone_dispersion.velocity.Velocity The wind field :math:`\vec{u}(x,y,t)`. msh : ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. """ self.U = U if U.t is None: self.minus_U = Velocity(msh, -U.at_vertical_interface, -U.at_horizontal_interface) else: minus_U_at_vertical_interface = np.zeros((U.t.size, U.at_vertical_interface.shape[0], U.at_vertical_interface.shape[1], 2)) minus_U_at_horizontal_interface = np.zeros( (U.t.size, U.at_horizontal_interface.shape[0], U.at_horizontal_interface.shape[1], 2) ) for it, tc in enumerate(U.t): U.at_current_time(tc) minus_U_at_vertical_interface[it, :, :, :] = -U.at_vertical_interface minus_U_at_horizontal_interface[it, :, :, :] = -U.at_horizontal_interface self.minus_U = Velocity(msh, minus_U_at_vertical_interface, minus_U_at_horizontal_interface, t=U.t) U.at_current_time(U.t[0]) self.msh = msh self.shape = (self.msh.y.size * self.msh.x.size, self.msh.y.size * self.msh.x.size) self.dtype = np.dtype(float)
[docs] def at_current_time(self, tc): r""" Update the attributes :attr:`U` and :attr:`minus_U` at a given time. Parameters ---------- tc : float or integer The current time. Notes ----- Updates the wind fields :attr:`U` and :attr:`minus_U` and their own attributes using the method :meth:`~pheromone_dispersion.velocity.Velocity.at_current_time` of the class :py:class:`~pheromone_dispersion.velocity.Velocity`. """ self.U.at_current_time(tc) self.minus_U.at_current_time(tc)
[docs] def _matvec(self, x_out): r""" Compute the image (matrix-vector product) of :math:`A:c\mapsto \nabla\cdot(\vec{u}c)` with the related boundary condition for a given concentration map :math:`c` at the current time. Parameters ---------- x_out : ~numpy.ndarray The map of concentration of pheromones :math:`c(x,y)` at a given time :math:`t` raveled into a vector. Returns ------- ~numpy.ndarray Array containing the image :math:`\nabla\cdot(\vec{u}c)`. Notes ----- The input :math:`c` has to be raveled into a (:attr:`msh.y.size` * :attr:`msh.x.size`,)-shape array to match the format of the :py:class:`~scipy.sparse.linalg.LinearOperator` class. The same way, the ouput can be reshape into a (:attr:`msh.y.size`, :attr:`msh.x.size`)-shape array to get the map :math:`Ac(x,y)` at the current time :math:`t` """ # reshape the vector (of (msh.x.size*msh.y.size, )) containg the concentration into a matrix (of shape (msh.y.size, msh.x.size)) x = x_out.reshape((self.msh.y.size, self.msh.x.size)) # unknown of the PDE sum_faces = sum_advection_flux_given_U(self.U, x, self.msh) return sum_faces.reshape((self.msh.y.size * self.msh.x.size,))
[docs] def _rmatvec(self, x_out): r""" Compute the image (matrix-vector product) of :math:`A^*:c^*\mapsto - \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*` with the related boundary condition for the map of a given element of the adjoint state space at the current time. Parameters ---------- x_out : ~numpy.ndarray The map of an element of the adjoint state space :math:`c^*(x,y)` at a given time :math:`t` raveled into a vector. Returns ------- ~numpy.ndarray Array containing the image :math:`- \nabla\cdot(\vec{u}c^*)+(\nabla.\vec{u})c^*`. Notes ----- The input :math:`c^*` has to be raveled into a (:attr:`msh.y.size` * :attr:`msh.x.size`,)-shape array to match the format of the :py:class:`~scipy.sparse.linalg.LinearOperator` class. The same way, the ouput can be reshape into a (:attr:`msh.y.size`, :attr:`msh.x.size`)-shape array to get the map :math:`A^*c^*(x,y)` at the current time :math:`t`. """ # reshape the vector (of (msh.x.size*msh.y.size, )) containg the concentration into a matrix (of shape (msh.y.size, msh.x.size)) x = x_out.reshape((self.msh.y.size, self.msh.x.size)) # unknown of the PDE sum_faces = sum_advection_flux_given_U(self.minus_U, x, self.msh) + self.U.div * x return sum_faces.reshape((self.msh.y.size * self.msh.x.size,))
def _matmat(self, x_out): # Compute the image (matrix-matrix product) # using a naive method based on the _matvec method return np.hstack([self.matvec(col.reshape(-1, 1)) for col in x_out.T])