Skip to content
Snippets Groups Projects
convection_diffusion_2D.py 20.00 KiB
import os
import sys
import time
from pathlib import Path

import numpy as np
import scipy as sp
from scipy.sparse.linalg import cg
from scipy.sparse.linalg import gmres

from pheromone_dispersion.advection_operator import Advection
from pheromone_dispersion.diffusion_operator import Diffusion
from pheromone_dispersion.identity_operator import Id
from pheromone_dispersion.reaction_operator import Reaction
from pheromone_dispersion.source_term import Source


class DiffusionConvectionReaction2DEquation:
    r"""
    Class containing the pheromone propagation model given by the 2D diffusion-convection-reaction PDE:

    .. math::
        \frac{\partial c}{\partial t}-\nabla\cdot(\mathbf{K}\nabla c)+\nabla\cdot(\vec{u}c)+\tau_{loss}c=s
        ~\forall (x,y)\in\Omega~\forall t\in]0;T]

    with the initial and boundary conditions:

        - a null initial condition :math:`c(x,y,t=0)=0~\forall (x,y)\in\Omega`,
        - a null diffusive flux :math:`\mathbf{K}\nabla c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega`,
        -  null convective influx
           :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]`
           with :math:`\vec{n}` the outgoing normal vector,

    and its solvers.

    Attributes
    ----------
    msh: ~pheromone_dispersion.geom.MeshRect2D
        The geometry of the domain.
    A: ~pheromone_dispersion.advection_operator.Advection
        The advection linear operator :math:`A:c\mapsto \nabla\cdot(\vec{u}c)~\forall (x,y)\in\Omega~\forall t\in]0;T]`
        with :math:`\vec{u}c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega\cap \{(x,y)|\vec{u}(x,y,t)\cdot\vec{n}<0\}~\forall t\in]0;T]`.
    D: ~pheromone_dispersion.diffusion_operator.Diffusion
        The diffusion linear operator :math:`D:c\mapsto -\nabla\cdot(\mathbf{K}\nabla c)~\forall (x,y)\in\Omega~\forall t\in]0;T]`
        with :math:`\mathbf{K}\nabla c\cdot\vec{n}=0~\forall (x,y)\in\partial\Omega`.
    R: ~pheromone_dispersion.reaction_operator.Reaction
        The reaction linear operator :math:`R:c\mapsto \tau_{loss}c~\forall (x,y)\in\Omega~\forall t\in]0;T]`.
    ID: ~pheromone_dispersion.identity_operator.Id
        The identity operator :math:`Id:c\mapsto c~\forall (x,y)\in\Omega~\forall t\in]0;T]`.
    S: ~pheromone_dispersion.source_term.Source
        The source term  :math:`s(x,y,t)`.
    implemented_solver_type: list of str
        The list of keywords of the implemented types of time discretization.
        The implemented types of time discretization are:
        `['implicit', 'semi-implicit', 'semi-implicit with matrix inversion', 'implicit with stationnary matrix inversion']`
    time_discretization: str
        The keyword identifying the type of time discretization.
    tol_inversion: float
        Tolerance for the inversion estimation algorithm called at each time step.
    inv_matrix_implicit_part: ~numpy.ndarray or None
        Inverse matrix of the implicit part matrix of the time discretization.
        Initialized to `None`.
    """

    # To do:
    # - add exceptions in case
    #   the inv_matrix_implicit_part is not initialized
    #   but should have been
    # - add exceptions to check inputs of __init__

    def __init__(self, U, K, coeff_depot, S, msh, time_discretization='semi-implicit', tol_inversion=1e-14):
        r"""
        Constructor method

        Parameters
        ----------
        U: ~pheromone_dispersion.velocity.Velocity
            The wind field :math:`\vec{u}(x,y,t)`.
        K: ~pheromone_dispersion.diffusion_tensor.DiffusionTensor
            The diffusion tensor :math:`\mathbf{K}(x,y,t)`.
        coeff_depot: ~numpy.ndarray
            The deposition coefficient :math:`\tau_{loss}(x,y)`.
        S: ~pheromone_dispersion.source_term.Source
            The source term :math:`s(x,y,t)`.
        msh: ~pheromone_dispersion.geom.MeshRect2D
            The geometry of the domain.
        time_discretization: str, default: 'semi-implicit'
            The keyword identifying the type of time discretization.
        tol_inversion: float, optional, default: 1e-14
            Tolerance for the inversion estimation algorithm called at each time step.

        Raises
        ------
        ValueError
            if the type of discretization is not implemented,
            i.e. if :attr:`time_discretization` is not in :attr:`implemented_solver_type`
        """

        self.msh = msh
        self.A = Advection(U, msh)
        self.D = Diffusion(K, msh)
        self.R = Reaction(coeff_depot, msh)
        self.Id = Id(msh)
        self.S = S
        self.time_discretization = time_discretization
        self.tol_inversion = tol_inversion
        self.implemented_solver_type = [
            'implicit',
            'semi-implicit',
            'semi-implicit with matrix inversion',
            'implicit with stationnary matrix inversion',
        ]
        if self.time_discretization not in self.implemented_solver_type:
            raise ValueError("The given time discretization is not implemented.")
        self.inv_matrix_implicit_part = None

    def init_inverse_matrix(self, path_to_matrix=None, matrix_file_name=None):
        """
        Initialize the attribute :attr:`inv_matrix_implicit_part`.
        If the inverse matrix of the implicit part matrix of the time discretization has never been computed, it is computed and stored.
        Otherwise, it loads the previously computed inverse matrix.

        Parameters
        ----------
        path_to_matrix: str, optional, default: None
            Path where to save or load the inverse matrix.
            If not provided, set to `'./data'`.
        matrix_file_name: str, optional, default: None
            Name of the file where the matrix stored or will be saved.
            If not provided, set to `'inv_matrix_**_scheme'`
            with ** either `'implicit'` or `'semi_implicit'` depending on the time discretization.

        Notes
        -----
        This method is usefull only if :attr:`time_discretization` is
        either `'semi-implicit with matrix inversion'` or `'implicit with stationnary matrix inversion'`.
        Otherwise, the linear system to solve at each time steps is solved using conjugate gradient or GMRES algorithm,
        and the attribute :attr:`inv_matrix_implicit_part` is not used.
        """

        # To do:
        # add exceptions in case
        # the given file name or path
        # does not exist

        if path_to_matrix is None:
            path_to_matrix = './data'
        if not os.path.isdir(path_to_matrix):
            os.makedirs(path_to_matrix)
        if matrix_file_name is None:
            if self.time_discretization == 'semi-implicit with matrix inversion':
                matrix_file_name = 'inv_matrix_semi_implicit_scheme'
            if self.time_discretization == 'implicit with stationnary matrix inversion':
                matrix_file_name = 'inv_matrix_implicit_scheme'
        if matrix_file_name[-4:] != '.npy':
            matrix_file_name += '.npy'

        # Compute the inverse of the matrix of the implicit part if the file is not in the folder
        if not (Path(path_to_matrix) / matrix_file_name).exists():
            print("=== Computation of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===")
            Identity = np.identity(self.msh.y.size * self.msh.x.size)

            if self.time_discretization == 'semi-implicit with matrix inversion':
                matrix_semi_implicit_scheme = Identity + self.msh.dt * (-self.D._matmat(Identity) + self.R._matmat(Identity))
                self.inv_matrix_implicit_part = sp.linalg.inv(matrix_semi_implicit_scheme)
                np.save(Path(path_to_matrix) / matrix_file_name, self.inv_matrix_implicit_part)

            if self.time_discretization == 'implicit with stationnary matrix inversion':
                t_i = time.time()
                matrix_implicit_scheme = Identity + self.msh.dt * (
                    -self.D._matmat(Identity) + self.R._matmat(Identity) + self.A._matmat(Identity)
                )
                self.inv_matrix_implicit_part = sp.linalg.inv(matrix_implicit_scheme)
                print("--- Computation at time t= ", self.msh.t, "in ", time.time() - t_i, " s")
                t_i = time.time()
                np.save(Path(path_to_matrix) / matrix_file_name, self.inv_matrix_implicit_part)

        # Load the inverse of the matrix of the implicit part if the file is in the folder
        else:
            print("=== Load of the inverse of the matrix of the implicit part of the " + self.time_discretization + " scheme ===")
            if self.time_discretization == 'semi-implicit with matrix inversion':
                self.inv_matrix_implicit_part = np.load(Path(path_to_matrix) / matrix_file_name)
            if self.time_discretization == 'implicit with stationnary matrix inversion':
                self.inv_matrix_implicit_part = np.load(Path(path_to_matrix) / matrix_file_name)

    def set_source(self, value, t=None):
        """
        Update the attribute :attr:`S` with the provided new values.

        Parameters
        ----------
        value: ~numpy.ndarray
            The new values of :attr:`S`.
        t: ~numpy.ndarray, optional, default: None
            The associated time array. `None` if the source is stationary.
        """

        self.S = Source(self.msh, value, t=t)

    def at_current_time(self, tc):
        r"""
        Update the attributes :attr:`S`, :attr:`D` and :attr:`A` at a given time.

        Parameters
        ----------
        tc : float or integer
            The current time.

        Notes
        -----
        Updates the attributes :attr:`S`, :attr:`D` and :attr:`A` and their own attributes
        using the method :meth:`~pheromone_dispersion.source_term.Source.at_current_time` of resp.
        the class :class:`~pheromone_dispersion.source_term.Source`,
        the class :class:`~pheromone_dispersion.diffusion_operator.Diffusion` and
        the class :class:`~pheromone_dispersion.advection_operator.Advection`.
        """
        self.S.at_current_time(tc)

        if not self.time_discretization == 'implicit with stationnary matrix inversion':
            self.A.at_current_time(tc)
            self.D.at_current_time(tc)

    def solver(self, save_flag=False, path_save='./save/', display_flag=True, store_rate=1):
        r"""
        Compute the concentration :math:`c(x,y,t)` by solving the pheromone propagation model on the whole time window.

        Parameters
        ----------
        save_flag: bool, optional, default: False
            If `True`, the resulting matrix of the concentration is saved.
        path_save: str, optional, default: './save/'
            Path of the directory in which the outputs are saved.
        display_flag: bool, default: True
            If `True`, print the evolution of the solver through the time iterations.
        store_rate: int, optional, default: 1
            Time frequency to which the concentration map is stored.

        Returns
        -------
        t_save: ~numpy.ndarray
            The array containing the times :math:`t` at which the concentration maps are stored.
        c_save: ~numpy.ndarray
            The concentration maps :math:`c(x,y)` at several times :math:`t`.
        """

        # initialization of the unknown variable at the current time
        c = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],))

        # initialization of the outputs array to be saved
        if save_flag:
            # if the save directory does not exist, then it is created
            if not os.path.isdir(path_save):
                os.makedirs(path_save)
        t_save = []
        c_save = []

        # loop until the final time or the steady state is reached
        for it, self.msh.t in enumerate(self.msh.t_array[1:]):
            if display_flag:
                sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s')
                sys.stdout.flush()

            # update the coefficients of the equation at the current time and
            self.at_current_time(self.msh.t)
            # store the concentration at the previous time step
            c_old = np.copy(c)  # NECESSARY???

            # inverse the linear system resulting the semi-implicit time discretization
            # using the pre-computed inverse matrix
            if self.time_discretization == 'semi-implicit with matrix inversion':
                c = self.inv_matrix_implicit_part.dot(c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()))
                info = 0
            # inverse the linear system resulting the semi-implicit time discretization
            # using a conjugate gradient method for the current time step
            elif self.time_discretization == 'semi-implicit':
                c, info = cg(
                    self.Id + self.msh.dt * (-self.D + self.R),
                    c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()),
                    x0=c_old,
                    tol=self.tol_inversion,
                )
            # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step
            elif self.time_discretization == 'implicit':
                c, info = gmres(
                    self.Id + self.msh.dt * (-self.D + self.R + self.A),
                    c_old + self.msh.dt * self.S.value.ravel(),
                    x0=c_old,
                    rtol=self.tol_inversion,
                )
            # inverse the linear system resulting the implicit time discretization
            # using the pre-computed inverse matrix
            elif self.time_discretization == 'implicit with stationnary matrix inversion':
                RHS = c_old + self.msh.dt * self.S.value.ravel()
                LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A)).matvec(c_old)
                flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS)
                if flag_residu:
                    c = np.dot(self.inv_matrix_implicit_part, RHS)
                info = 0

            if info > 0:
                raise ValueError(
                    "The algorithme used to solve the linear system has not converge"
                    + "to the expected tolerance or within the maximum number of iteration."
                )

            if info < 0:
                raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.")

            # store the outputs
            if it % store_rate == 0:
                t_save.append(self.msh.t)
                c_save.append(c.reshape((self.msh.y.shape[0], self.msh.x.shape[0])))

        # save the ouputs
        if save_flag:
            np.save(Path(path_save) / 'c_save.npy', c_save)
            np.save(Path(path_save) / 't_save.npy', t_save)

        return np.array(t_save), np.array(c_save)

    def solver_est_at_obs_times(self, obs, display_flag=True):
        """
        Compute the concentration :math:`c(x,y,t)` by solving the pheromone propagation model on the whole time window
        and store the results at the times and positions required to estimate the observed variable
        in the attribute :attr:`~source_localization.obs.Obs.c_est` of the given object of the class :class:`~source_localization.obs.Obs`.

        Parameters
        ----------
        obs: ~source_localization.obs.Obs
            Object containing all the features related to the observations and estimation of the observed variables
            and in which :math:`c(x,y,t)` is stored at the times and positions required to estimate the observed variable.
        display_flag: bool, default: True
            If `True`, print the evolution of the solver through the time iterations.
        """

        # initialization of the unknown variable at the current time
        c = np.zeros((self.msh.y.shape[0] * self.msh.x.shape[0],))

        if 0 in obs.index_obs_to_index_time_est:
            c_prov = c.reshape((self.msh.y.shape[0], self.msh.x.shape[0]))
            for index_obs in obs.index_time_est_to_index_obs[0]:
                index_x_est = np.argmin(np.abs(self.msh.x - obs.X_obs[index_obs, 0]))
                index_y_est = np.argmin(np.abs(self.msh.y - obs.X_obs[index_obs, 1]))
                obs.c_est[index_obs] = c_prov[index_y_est, index_x_est]

        # loop until the final time or the steady state is reached
        for it, self.msh.t in enumerate(self.msh.t_array[1:]):
            if display_flag:
                sys.stdout.write(f'\rt = {"{:.3f}".format(self.msh.t)} / {"{:.3f}".format(self.msh.T_final)} s')
                sys.stdout.flush()
            # update the coefficients of the equation at the current time and
            self.at_current_time(self.msh.t)
            # store the concentration at the previous time step
            c_old = np.copy(c)  # NECESSARY???

            # inverse the linear system resulting the semi-implicit time discretization
            # using the pre-computed inverse matrix
            if self.time_discretization == 'semi-implicit with matrix inversion':
                c = self.inv_matrix_implicit_part.dot(c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()))
                info = 0
            # inverse the linear system resulting the semi-implicit time discretization
            # using a conjugate gradient method for the current time step
            elif self.time_discretization == 'semi-implicit':
                c, info = cg(
                    self.Id + self.msh.dt * (-self.D + self.R),
                    c_old + self.msh.dt * (-self.A.matvec(c_old) + self.S.value.ravel()),
                    x0=c_old,
                    tol=self.tol_inversion,
                )
            # inverse the linear system resulting the implicit time discretization using a gmres method for the current time step
            elif self.time_discretization == 'implicit':
                c, info = gmres(
                    self.Id + self.msh.dt * (-self.D + self.R + self.A),
                    c_old + self.msh.dt * self.S.value.ravel(),
                    x0=c_old,
                    rtol=self.tol_inversion,
                )
            # inverse the linear system resulting the implicit time discretization
            # using the pre-computed inverse matrix
            elif self.time_discretization == 'implicit with stationnary matrix inversion':
                RHS = c_old + self.msh.dt * self.S.value.ravel()
                if not np.linalg.norm(RHS, ord=np.inf) < 1e-16:
                    LHS = (self.Id + self.msh.dt * (-self.D + self.R + self.A)).matvec(c_old)
                    flag_residu = not np.linalg.norm(RHS - LHS) < self.tol_inversion * np.linalg.norm(RHS)
                    # print(np.linalg.norm(RHS),np.linalg.norm(RHS, ord=np.inf))
                    if flag_residu:
                        c = np.dot(self.inv_matrix_implicit_part, RHS)
                else:
                    c = np.zeros_like(c)  # print("the norm is 0")
                # c = np.dot(self.inv_matrix_implicit_part, c_old + self.msh.dt * self.S.value.ravel())
                info = 0

            if info > 0:
                raise ValueError(
                    "The algorithme used to solve the linear system has not converge"
                    + "to the expected tolerance or within the maximum number of iteration."
                )

            if info < 0:
                raise ValueError("The algorithme used to solve the linear system could not proceed du to illegal input or breakdown.")

            it += 1
            # store the output in obs.c_est if necessary
            if it in obs.index_obs_to_index_time_est:
                c_prov = c.reshape((self.msh.y.shape[0], self.msh.x.shape[0]))
                for index_obs in obs.index_time_est_to_index_obs[it]:
                    i = np.where(obs.index_obs_to_index_time_est[index_obs, :] == it)
                    index_x_est = np.argmin(np.abs(self.msh.x - obs.X_obs[index_obs, 0]))
                    index_y_est = np.argmin(np.abs(self.msh.y - obs.X_obs[index_obs, 1]))
                    obs.c_est[index_obs, i] = c_prov[index_y_est, index_x_est]