from datetime import timedelta
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.interpolate import LinearNDInterpolator as interp2d
from scipy.interpolate import interp1d
"""
Module that contains the implementation of the wind velocity field.
"""
[docs]
class Velocity:
r"""
Class containing the wind velocity field :math:`\vec{u}(x,y,t)=(u(x,y,t),v(x,y,t))`.
Attributes
----------
at_vertical_interface: ~numpy.ndarray
The wind velocity field :math:`\vec{u}(x,y)` at the vertical interfaces of each cells of the mesh at the current time :math:`t`.
at_horizontal_interface: ~numpy.ndarray
The wind velocity field :math:`\vec{u}(x,y)` at the horizontal interfaces of each cells of the mesh at the current time :math:`t`.
div : ~numpy.ndarray
The divergence of the wind velocity field
:math:`div~\vec{u}(x,y)=\partial_xu(x,y)+\partial_yv(x,y)`
at the center of each cells of the mesh at the current time :math:`t`.
t : ~numpy.ndarray
The array of times :math:`t` at which the velocity fields maps :math:`\vec{u}(x,y)` are given.
If :math:`\vec{u}` is stationary, set to `None`.
time_interpolation_at_vertical_interface : scipy.interpolate.interp1d
Callable function used to compute the wind velocity field :math:`\vec{u}(x,y)`
at the vertical interfaces of each cells of the mesh given a time :math:`t`.
Computation using the linear interpolation of the wind fields provided on the given time array.
time_interpolation_at_horizontal_interface : scipy.interpolate.interp1d
Callable function used to compute the wind velocity field :math:`\vec{u}(x,y)`
at the horizontal interfaces of each cells of the mesh given a time :math:`t`.
Computation using the linear interpolation of the wind fields provided on the given time array.
time_interpolation_div : scipy.interpolate.interp1d
Callable function used to compute the divergence of the wind velocity field :math:`div~\vec{u}(x,y)` given a time :math:`t`.
Computation using the linear interpolation of the divergence of the wind fields originally computed on the given time array.
cell_above_upwind : ~numpy.ndarray of boolean
For each cell, is `True` if the cell above is the upwind cell
for the :math:`y`-component of the wind velocity :math:`v`
at the current time :math:`t`.
cell_under_upwind : ~numpy.ndarray of boolean
For each cell, is `True` if the cell under is the upwind cell
for the :math:`y`-component of the wind velocity :math:`v`
at the current time :math:`t`
cell_right_upwind : ~numpy.ndarray of boolean
For each cell, is `True` if the right cell is the upwind cell
for the :math:`x`-component of the wind velocity :math:`u`
at the current time :math:`t`
cell_left_upwind : ~numpy.ndarray of boolean
For each cell, is `True` if the left cell is the upwind cell
for the :math:`x`-component of the wind velocity :math:`u`
at the current time :math:`t`
max_horizontal_U : float
The :math:`L^\infty` norm of the horizontal component of the velocity field :math:`max_{(x,y,t)}(|u(x,y,t)|)`.
max_vertical_U : float
The :math:`L^\infty` norm of the vertical component of the velocity field :math:`max_{(x,y,t)}(|v(x,y,t)|)`.
"""
[docs]
def __init__(self, msh, U_at_vertical_interface, U_at_horizontal_interface, t=None):
r"""
Constructor method.
Parameters
----------
msh : ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
U_at_vertical_interface : ~numpy.ndarray
The wind velocity fields :math:`\vec{u}(x,y)` at one or multiple times at the vertical interfaces of each cells of the mesh.
U_at_horizontal_interface : ~numpy.ndarray
The wind velocity fields :math:`\vec{u}(x,y)` at one or multiple times at the horizontal interfaces of each cells of the mesh.
t : ~numpy.ndarray or None, default: None
The array of the times :math:`t` at which the wind velocity fields :math:`\vec{u}(x,y)` are given.
`None` if the :math:`\vec{u}` is stationary.
Raises
------
ValueError
if the shape of the provided wind velocity fields :math:`\vec{u}(x,y)`
does not fit with the shape of the arrays of the coordinates
of the cells' interfaces and of the provided time array.
"""
self.t = t
if self.t is not None:
if U_at_horizontal_interface.ndim != 4 or U_at_vertical_interface.ndim != 4:
raise ValueError(
"The number dimension of the velocity field is incorrect,"
+ " the velocity field is unsteady,"
+ " the number of dimension shoud be 3"
)
if self.t.size != U_at_vertical_interface.shape[0] or self.t.size != U_at_horizontal_interface.shape[0]:
raise ValueError("The shape of the velocity field does not coincide with the shape of the time vector")
div_U_all_t = ((U_at_horizontal_interface[:, 1:, :, 1] - U_at_horizontal_interface[:, :-1, :, 1]) / msh.dy) + (
(U_at_vertical_interface[:, :, 1:, 0] - U_at_vertical_interface[:, :, :-1, 0]) / msh.dx
)
self.at_vertical_interface = U_at_vertical_interface[0, :, :, :]
self.at_horizontal_interface = U_at_horizontal_interface[0, :, :, :]
self.div = div_U_all_t[0, :, :]
self.time_interpolation_at_vertical_interface = interp1d(t, U_at_vertical_interface, axis=0)
self.time_interpolation_at_horizontal_interface = interp1d(t, U_at_horizontal_interface, axis=0)
self.time_interpolation_div = interp1d(t, div_U_all_t, axis=0)
self.max_horizontal_U = np.max(
(np.max(np.abs(U_at_vertical_interface[:, :, :, 0])), np.max(np.abs(U_at_horizontal_interface[:, :, :, 0])))
)
self.max_vertical_U = np.max(
(np.max(np.abs(U_at_vertical_interface[:, :, :, 1])), np.max(np.abs(U_at_horizontal_interface[:, :, :, 1])))
)
else:
if U_at_horizontal_interface.ndim != 3 or U_at_vertical_interface.ndim != 3:
raise ValueError(
"The number dimension of the velocity field is incorrect,"
+ " the velocity field is unsteady,"
+ " the number of dimension shoud be 2"
)
self.at_vertical_interface = U_at_vertical_interface
self.at_horizontal_interface = U_at_horizontal_interface
self.div = ((self.at_horizontal_interface[1:, :, 1] - self.at_horizontal_interface[:-1, :, 1]) / msh.dy) + (
(self.at_vertical_interface[:, 1:, 0] - self.at_vertical_interface[:, :-1, 0]) / msh.dx
)
self.max_horizontal_U = np.max((np.max(U_at_vertical_interface[:, :, 0]), np.max(U_at_horizontal_interface[:, :, 0])))
self.max_vertical_U = np.max((np.max(U_at_vertical_interface[:, :, 1]), np.max(U_at_horizontal_interface[:, :, 1])))
self.cell_above_upwind = self.at_horizontal_interface[:, :, 1] < 0
self.cell_under_upwind = self.at_horizontal_interface[:, :, 1] > 0
self.cell_right_upwind = self.at_vertical_interface[:, :, 0] < 0
self.cell_left_upwind = self.at_vertical_interface[:, :, 0] >= 0
if self.at_vertical_interface.shape != (msh.y.size, msh.x_vertical_interface.size, 2):
raise ValueError(
"The shape of the velocity field at the vertical and horizontal interfaces do not match with the shape of the msh."
)
if (
self.at_vertical_interface.shape[0] + 1 != self.at_horizontal_interface.shape[0]
or self.at_horizontal_interface.shape[1] + 1 != self.at_vertical_interface.shape[1]
):
raise ValueError(
"The shape of the velocity field at the vertical interfaces"
+ "(that is (nb of cells along the y axis, nb of cells along the x axis + 1)) "
+ "does not coincide with the shape of the velocity field at the horizontal interfaces"
+ "(that is (nb of cells along the y axis + 1, nb of cells along the x axis))."
)
[docs]
def at_current_time(self, tc):
r"""
Update the attributes :attr:`at_vertical_interface`,
:attr:`at_horizontal_interface` and :attr:`div` at a given time
using the linear interpolation callable attributes resp
:attr:`time_interpolation_at_vertical_interface`,
:attr:`time_interpolation_at_horizontal_interface` and
:attr:`time_interpolation_div`.
Parameters
----------
tc : float or integer
The current time.
Raises
------
ValueError
if the given time is not between the first and last times of the attribute :attr:`t`,
i.e. between the first and last times at which the wind velocity fields are given.
"""
if self.t is not None:
if tc < min(self.t) or tc > max(self.t):
raise ValueError("The given time must be between the lowest and largest times contained in the time vector.")
self.at_vertical_interface = self.time_interpolation_at_vertical_interface(tc)
self.at_horizontal_interface = self.time_interpolation_at_horizontal_interface(tc)
self.div = self.time_interpolation_div(tc)
self.cell_above_upwind = self.at_horizontal_interface[:, :, 1] < 0
self.cell_under_upwind = self.at_horizontal_interface[:, :, 1] >= 0
self.cell_right_upwind = self.at_vertical_interface[:, :, 0] < 0
self.cell_left_upwind = self.at_vertical_interface[:, :, 0] >= 0
[docs]
def velocity_field_from_meteo_data(path_data, file_name_data, msh):
r"""
Derive the wind velocity field :math:`\vec{u}` as an object of the :class:`Velocity` class from meteorogical data.
Read the meteorological wind velocity data,
linearly interpolate in space the meteorological wind data
on the horizontal and vertical interfaces of each cells of the mesh
and return an object of the :class:`Velocity` class
containing the linear interpolation of these data on the mesh.
Parameters
----------
path_data: str
Path to the folder that contains the meteorogical data.
file_name_data: str
Name of the file that contains the meteorological data.
msh: ~pheromone_dispersion.geom.MeshRect2D
The geometry of the domain.
Returns
-------
Velocity
The velocity field :math:`\vec{u}` obtained by linear interpolation of the meteorological data on the mesh.
"""
# To do:
# add exceptions
# especially to check that the domain is covered
df = pd.read_csv(Path(path_data) / file_name_data)
nb_times = len(df['step'].unique())
t = np.zeros((nb_times,))
U_at_vertical_interface = np.zeros((nb_times, msh.y.size, msh.x_vertical_interface.size, 2))
U_at_horizontal_interface = np.zeros((nb_times, msh.y_horizontal_interface.size, msh.x.size, 2))
xx_at_vertical_interface, yy_at_vertical_interface = np.meshgrid(msh.x_vertical_interface, msh.y)
xx_at_horizontal_interface, yy_at_horizontal_interface = np.meshgrid(msh.x, msh.y_horizontal_interface)
for i_step, step in enumerate(df['step'].unique()):
d, _, hms = step.split(" ")
h, m, s = hms.split(":")
t[i_step] = timedelta(days=int(d), seconds=int(s), minutes=int(m), hours=int(h)).total_seconds()
df_tc = df.loc[df['step'] == step]
interp_u = interp2d(list(zip(df_tc['X'], df_tc['Y'])), df_tc['u10'])
interp_v = interp2d(list(zip(df_tc['X'], df_tc['Y'])), df_tc['v10'])
U_at_vertical_interface[i_step, :, :, 0] = interp_u(xx_at_vertical_interface, yy_at_vertical_interface)
U_at_vertical_interface[i_step, :, :, 1] = interp_v(xx_at_vertical_interface, yy_at_vertical_interface)
U_at_horizontal_interface[i_step, :, :, 0] = interp_u(xx_at_horizontal_interface, yy_at_horizontal_interface)
U_at_horizontal_interface[i_step, :, :, 1] = interp_v(xx_at_horizontal_interface, yy_at_horizontal_interface)
return Velocity(msh, U_at_vertical_interface, U_at_horizontal_interface, t=t)