import os import sys import time from pathlib import Path import numpy as np import scipy as sp from scipy.sparse.linalg import cg from scipy.sparse.linalg import gmres from pheromone_dispersion.advection_operator import Advection from pheromone_dispersion.diffusion_operator import Diffusion from pheromone_dispersion.identity_operator import Id from pheromone_dispersion.reaction_operator import Reaction from pheromone_dispersion.source_term import Source class DiffusionConvectionReaction2DEquation: r""" Class containing the pheromone propagation model given by the 2D diffusion-convection-reaction PDE: .. math:: \frac{\partial c}{\partial t}-\nabla\cdot(\mathbf{K}\nabla c)+\nabla\cdot(\vec{u}c)+\tau_{loss}c=s ~\forall (x,y)\in\Omega~\forall t\in]0;T] with the initial and boundary conditions: - a null initial condition :math:`c(x,y,t=0)=0~\forall (x,y)\in\Omega`, - a null diffusive flux :math:`\mathbf{K}\nabla c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega`, - null convective influx :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]` with :math:`\vec{n}` the outgoing normal vector, and its solvers. Attributes ---------- msh: ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. A: ~pheromone_dispersion.advection_operator.Advection The advection linear operator :math:`A:c\mapsto \nabla\cdot(\vec{u}c)~\forall (x,y)\in\Omega~\forall t\in]0;T]` with :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]`. D: ~pheromone_dispersion.diffusion_operator.Diffusion The diffusion linear operator :math:`D:c\mapsto -\nabla\cdot(\mathbf{K}\nabla c)~\forall (x,y)\in\Omega~\forall t\in]0;T]` with :math:`\mathbf{K}\nabla c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega`. R: ~pheromone_dispersion.reaction_operator.Reaction The reaction linear operator :math:`R:c\mapsto \tau_{loss}c~\forall (x,y)\in\Omega~\forall t\in]0;T]`. ID: ~pheromone_dispersion.identity_operator.Id The identity operator :math:`Id:c\mapsto c~\forall (x,y)\in\Omega~\forall t\in]0;T]`. S: ~pheromone_dispersion.source_term.Source The source term :math:`s(x,y,t)`. implemented_solver_type: list of str The list of keywords of the implemented types of time discretization. The implemented types of time discretization are: `['implicit', 'semi-implicit', 'semi-implicit with matrix inversion', 'implicit with stationnary matrix inversion']` time_discretization: str The keyword identifying the type of time discretization. tol_inversion: float Tolerance for the inversion estimation algorithm called at each time step. inv_matrix_implicit_part: ~numpy.ndarray or None Inverse matrix of the implicit part matrix of the time discretization. Initialized to `None`. """ # To do: # - add exceptions in case # the inv_matrix_implicit_part is not initialized # but should have been # - add exceptions to check inputs of __init__ def __init__(self, U, K, coeff_depot, S, msh, time_discretization='semi-implicit', tol_inversion=1e-14): r""" Constructor method Parameters ---------- U: ~pheromone_dispersion.velocity.Velocity The wind field :math:`\vec{u}(x,y,t)`. K: ~pheromone_dispersion.diffusion_tensor.DiffusionTensor The diffusion tensor :math:`\mathbf{K}(x,y,t)`. coeff_depot: ~numpy.ndarray The deposition coefficient :math:`\tau_{loss}(x,y)`. S: ~pheromone_dispersion.source_term.Source The source term :math:`s(x,y,t)`. msh: ~pheromone_dispersion.geom.MeshRect2D The geometry of the domain. time_discretization: str, default: 'semi-implicit' The keyword identifying the type of time discretization. tol_inversion: float, optional, default: 1e-14 Tolerance for the inversion estimation algorithm called at each time step. Raises ------ ValueError if the type of discretization is not implemented, i.e. if :attr:`time_discretization` is not in :attr:`implemented_solver_type` """ self.msh = msh self.A = Advection(U, msh) self.D = Diffusion(K, msh) self.R = Reaction(coeff_depot, msh) self.Id = Id(msh) self.S = S self.time_discretization = time_discretization self.tol_inversion = tol_inversion self.implemented_solver_type = [ 'implicit', 'semi-implicit', 'semi-implicit with matrix inversion', 'implicit with stationnary matrix inversion', ] if self.time_discretization not in self.implemented_solver_type: raise ValueError("The given time discretization is not implemented.") self.inv_matrix_implicit_part = None def init_inverse_matrix(self, path_to_matrix=None, matrix_file_name=None): """ Initialize the attribute :attr:`inv_matrix_implicit_part`. If the inverse matrix of the implicit part matrix of the time discretization has never been computed, it is computed and stored. Otherwise, it loads the previously computed inverse matrix. Parameters ---------- path_to_matrix: str, optional, default: None Path where to save or load the inverse matrix. If not provided, set to `'./data'`. matrix_file_name: str, optional, default: None Name of the file where the matrix stored or will be saved. If not provided, set to `'inv_matrix_**_scheme'` with ** either `'implicit'` or `'semi_implicit'` depending on the time discretization. Notes ----- This method is usefull only if :attr:`time_discretization` is either `'semi-implicit with matrix inversion'` or `'implicit with stationnary matrix inversion'`. Otherwise, the linear system to solve at each time steps is solved using conjugate gradient or GMRES algorithm, and the attribute :attr:`inv_matrix_implicit_part` is not used. """ # To do: # add exceptions in case # the given file name or path # does not exist if path_to_matrix is None: path_to_matrix = './data' if not os.path.isdir(path_to_matrix): os.makedirs(path_to_matrix) if matrix_file_name is None: if self.time_discretization == 'semi-implicit with matrix inversion': matrix_file_name = 'inv_matrix_semi_implicit_scheme' if self.time_discretization == 'implicit with stationnary matrix inversion': matrix_file_name = 'inv_matrix_implicit_scheme' if matrix_file_name[-4:] != '.npy': matrix_file_name += '.npy' # Compute the inverse of the matrix of the implicit part if the file is not in the folder if not (Path(path_to_matrix) / matrix_file_name).exists(): print("=== Computation of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===") Identity = np.identity(self.msh.y.size * self.msh.x.size) if self.time_discretization == 'semi-implicit with matrix inversion': matrix_semi_implicit_scheme = Identity + self.msh.dt * (-self.D._matmat(Identity) + self.R._matmat(Identity)) self.inv_matrix_implicit_part = sp.linalg.inv(matrix_semi_implicit_scheme) np.save(Path(path_to_matrix) / matrix_file_name, self.inv_matrix_implicit_part) if self.time_discretization == 'implicit with stationnary matrix inversion': t_i = time.time() matrix_implicit_scheme = Identity + self.msh.dt * ( -self.D._matmat(Identity) + self.R._matmat(Identity) + self.A._matmat(Identity) ) self.inv_matrix_implicit_part = sp.linalg.inv(matrix_implicit_scheme) print("--- Computation at time t= ", self.msh.t, "in ", time.time() - t_i, " s") t_i = time.time() np.save(Path(path_to_matrix) / matrix_file_name, self.inv_matrix_implicit_part) # Load the inverse of the matrix of the implicit part if the file is in the folder else: print("=== Load of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===") if self.time_discretization == 'semi-implicit with matrix inversion': self.inv_matrix_implicit_part = np.load(Path(path_to_matrix) / matrix_file_name) if self.time_discretization == 'implicit with stationnary matrix inversion': self.inv_matrix_implicit_part = np.load(Path(path_to_matrix) / matrix_file_name) def set_source(self, value, t=None): """ Update the attribute :attr:`S` with the provided new values. Parameters ---------- value: ~numpy.ndarray The new values of :attr:`S`. t: ~numpy.ndarray, optional, default: None The associated time array. `None` if the source is stationary. """ self.S = Source(self.msh, value, t=t) def at_current_time(self, tc): r""" Update the attributes :attr:`S`, :attr:`D` and :attr:`A` at a given time. Parameters ---------- tc : float or integer The current time. Notes ----- Updates the attributes :attr:`S`, :attr:`D` and :attr:`A` and their own attributes using the method :meth:`~pheromone_dispersion.source_term.Source.at_current_time` of resp. the class :class:`~pheromone_dispersion.source_term.Source`, the class :class:`~pheromone_dispersion.diffusion_operator.Diffusion` and the class :class:`~pheromone_dispersion.advection_operator.Advection`. """ self.S.at_current_time(tc) if not self.time_discretization == 'implicit with stationnary matrix inversion': self.A.at_current_time(tc) self.D.at_current_time(tc) def solver(self, save_flag=False, path_save='./save/', display_flag=True, store_rate=1): r""" Compute the concentration :math:`c(x,y,t)` by solving the pheromone propagation model on the whole time window. Parameters ---------- save_flag: bool, optional, default: False If `True`, the resulting matrix of the concentration is saved. path_save: str, optional, default: './save/' Path of the directory in which the outputs are saved. display_flag: bool, default: True If `True`, print the evolution of the solver through the time iterations. store_rate: int, optional, default: 1 Time frequency to which the concentration map is stored. Returns ------- t_save: ~numpy.ndarray The array containing the times :math:`t` at which the concentration maps are stored. c_save: ~numpy.ndarray The concentration maps :math:`c(x,y)` at several times :math:`t`. """ # initialization of the unknown variable at the current time c = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],)) # initialization of the outputs array to be saved if save_flag: # if the save directory does not exist, then it is created if not os.path.isdir(path_save): os.makedirs(path_save) t_save = [] c_save = [] # loop until the final time or the steady state is reached for it, self.msh.t in enumerate(self.msh.t_array[1:]): if display_flag: sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s') sys.stdout.flush() # update the coefficients of the equation at the current time and self.at_current_time(self.msh.t) # store the concentration at the previous time step c_old = np.copy(c) # NECESSARY??? # inverse the linear system resulting the semi-implicit time discretization # using the pre-computed inverse matrix if self.time_discretization == 'semi-implicit with matrix inversion': c = self.inv_matrix_implicit_part.dot(c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel())) info = 0 # inverse the linear system resulting the semi-implicit time discretization # using a conjugate gradient method for the current time step elif self.time_discretization == 'semi-implicit': c, info = cg( self.Id + self.msh.dt * (-self.D + self.R), c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()), x0=c_old, tol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step elif self.time_discretization == 'implicit': c, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A), c_old + self.msh.dt * self.S.value.ravel(), x0=c_old, rtol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization # using the pre-computed inverse matrix elif self.time_discretization == 'implicit with stationnary matrix inversion': RHS = c_old + self.msh.dt * self.S.value.ravel() LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A)).matvec(c_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) if flag_residu: c = np.dot(self.inv_matrix_implicit_part, RHS) info = 0 if info > 0: raise ValueError( "The algorithme used to solve the linear system has not converge" + "to the expected tolerance or within the maximum number of iteration." ) if info < 0: raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.") # store the outputs if it % store_rate == 0: t_save.append(self.msh.t) c_save.append(c.reshape((self.msh.y.shape[0], self.msh.x.shape[0]))) # save the ouputs if save_flag: np.save(Path(path_save) / 'c_save.npy', c_save) np.save(Path(path_save) / 't_save.npy', t_save) return np.array(t_save), np.array(c_save) def solver_est_at_obs_times(self, obs, display_flag=True): """ Compute the concentration :math:`c(x,y,t)` by solving the pheromone propagation model on the whole time window and store the results at the times and positions required to estimate the observed variable in the attribute :attr:`~source_localization.obs.Obs.c_est` of the given object of the class :class:`~source_localization.obs.Obs`. Parameters ---------- obs: ~source_localization.obs.Obs Object containing all the features related to the observations and estimation of the observed variables and in which :math:`c(x,y,t)` is stored at the times and positions required to estimate the observed variable. display_flag: bool, default: True If `True`, print the evolution of the solver through the time iterations. """ # initialization of the unknown variable at the current time c = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],)) if 0 in obs.index_obs_to_index_time_est: c_prov = c.reshape((self.msh.y.shape[0], self.msh.x.shape[0])) for index_obs in obs.index_time_est_to_index_obs[0]: index_x_est = np.argmin(np.abs(self.msh.x - obs.X_obs[index_obs, 0])) index_y_est = np.argmin(np.abs(self.msh.y - obs.X_obs[index_obs, 1])) obs.c_est[index_obs] = c_prov[index_y_est, index_x_est] # loop until the final time or the steady state is reached for it, self.msh.t in enumerate(self.msh.t_array[1:]): if display_flag: sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s') sys.stdout.flush() # update the coefficients of the equation at the current time and self.at_current_time(self.msh.t) # store the concentration at the previous time step c_old = np.copy(c) # NECESSARY??? # inverse the linear system resulting the semi-implicit time discretization # using the pre-computed inverse matrix if self.time_discretization == 'semi-implicit with matrix inversion': c = self.inv_matrix_implicit_part.dot(c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel())) info = 0 # inverse the linear system resulting the semi-implicit time discretization # using a conjugate gradient method for the current time step elif self.time_discretization == 'semi-implicit': c, info = cg( self.Id + self.msh.dt * (-self.D + self.R), c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()), x0=c_old, tol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step elif self.time_discretization == 'implicit': c, info = gmres( self.Id + self.msh.dt * (-self.D + self.R + self.A), c_old + self.msh.dt * self.S.value.ravel(), x0=c_old, rtol=self.tol_inversion, ) # inverse the linear system resulting the implicit time discretization # using the pre-computed inverse matrix elif self.time_discretization == 'implicit with stationnary matrix inversion': RHS = c_old + self.msh.dt * self.S.value.ravel() if not np.linalg.norm(RHS, ord=np.inf) < 1e-16: LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A)).matvec(c_old) flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS) # print(np.linalg.norm(RHS),np.linalg.norm(RHS, ord=np.inf)) if flag_residu: c = np.dot(self.inv_matrix_implicit_part, RHS) else: c = np.zeros_like(c) # print("the norm is 0") # c = np.dot(self.inv_matrix_implicit_part, c_old + self.msh.dt * self.S.value.ravel()) info = 0 if info > 0: raise ValueError( "The algorithme used to solve the linear system has not converge" + "to the expected tolerance or within the maximum number of iteration." ) if info < 0: raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.") it += 1 # store the output in obs.c_est if necessary if it in obs.index_obs_to_index_time_est: c_prov = c.reshape((self.msh.y.shape[0], self.msh.x.shape[0])) for index_obs in obs.index_time_est_to_index_obs[it]: i = np.where(obs.index_obs_to_index_time_est[index_obs, :] == it) index_x_est = np.argmin(np.abs(self.msh.x - obs.X_obs[index_obs, 0])) index_y_est = np.argmin(np.abs(self.msh.y - obs.X_obs[index_obs, 1])) obs.c_est[index_obs, i] = c_prov[index_y_est, index_x_est]