Source code for rameau.core.simulation

# Copyright 2025, BRGM
# 
# This file is part of Rameau.
# 
# Rameau is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
# 
# Rameau is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License along with
# Rameau. If not, see <https://www.gnu.org/licenses/>.
#

"""
Rameau simulations
"""
import os
import warnings
from typing import Literal, List, Optional, Union
import pandas as pd
import numpy as np
import datetime

from rameau.wrapper import (
    CSimulation, COptiSimulation, CForecastSimulation # type: ignore
)
from rameau.core.settings import (
    SimulationSettings,
    ForecastSettings,
    OptimizationSettings,
    OutputSettings
)
from rameau.core import Tree
from rameau.core.states import StatesCollection
from rameau.core.inputs import InputCollection

from rameau.core._utils import wrap_property, _check_literal, _get_datetime
from rameau.core._descriptor import _GetDerivedTypeDecriptor, _IntDescriptor


[docs] class Simulation(): """rameau simulations.""" _c_class = CSimulation _metrics_riv_keys_meths = [ "nse", "kge", "kge_2012", "nse_sqrt", "kge_sqrt", "kge_2012_sqrt", "nse_log", "kge_log", "kge_2012_log", "ratio" ] _metrics_gw_keys_meths = [ "nse" ] _variables_ids = { 'output': { 'riverflow': 0, 'watertable': 1, }, 'budget_riverflow': { 'deficit': 18, 'runoff': 19, 'runoff_overflow': 20, 'exchange': 21, 'pumping': 22, 'baseflow': 5, 'gw_overflow': 6, 'gw_exchange': 7 }, 'budget': { 'rainfall': 2, 'snowfall': 3, 'potential_evapotranspiration': 4, 'unmet_potential_evapotranspiration': 5, 'actual_evapotranspiration': 6, 'effective_rainfall': 7, 'height_snowpack': 8, 'retention_snowpack': 9, 'height_thornthwaite_reservoir': 10, 'height_progressive_reservoir': 11, 'runoff': 12, 'runoff_overflow': 13, 'seepage': 14, 'height_soil': 15, 'riverflow_local': 16, 'gw_unmet_pumping': 17, 'baseflow': 0, 'drainage': 1, 'exchanges_flow': 2, 'groundwater_overflow': 3, 'groundwater_state': 4, } } _inputs: InputCollection = _GetDerivedTypeDecriptor( 0, InputCollection ) # type: ignore simulation_settings: SimulationSettings = _GetDerivedTypeDecriptor( 0, SimulationSettings ) # type: ignore output_settings: OutputSettings = _GetDerivedTypeDecriptor( 0, OutputSettings ) # type: ignore tree: Tree = _GetDerivedTypeDecriptor( 0, Tree ) # type: ignore _ntimestep: int = _IntDescriptor(0) #type: ignore def __init__(self) -> None: self._m = self._c_class() self.optimization_settings: OptimizationSettings self._forecast_settings: ForecastSettings def _set_columns(self, header_type): c = self.tree.connection if header_type == 'id': columns = list(c.keys()) elif header_type == 'name': columns = [self.tree.watersheds[i].name for i in range(len(c))] return columns @_get_datetime def _get_date(self, date): return date def _input_to_dataframe(self, data, header_type, nan_nodata=False): columns = self._set_columns(header_type) if np.size(data.data) == 0: return pd.DataFrame() df = pd.DataFrame( data.data, index=data.dates, columns=columns ) df.index.name = "dates" df.columns.name = "watersheds" if nan_nodata: df = df.where(df != data.nodata, np.nan) return df.loc[ self.simulation_settings.starting_date: self.simulation_settings.ending_date, : ] def _output_to_dataframe(self, data, header_type): columns = self._set_columns(header_type) if np.size(data) == 0: return pd.DataFrame() df = pd.DataFrame( data, index=[ self._get_date(d.getDatetime()) for d in self._m.getOutputs().getDates() ], columns=columns ) df.index.name = "dates" df.columns.name = "watersheds" return df.loc[ self.simulation_settings.starting_date: self.simulation_settings.ending_date,: ] def _get_budget( self, variables, header_type, budget_type, vector_variable ): _check_literal(header_type, ["id", "name"]) if variables is not None: if isinstance(variables, str): variables = [variables] variables2 = { key:value for key, value in self._variables_ids[budget_type].items() if key in variables } else: variables2 = self._variables_ids[budget_type] # determine maximum number of groundwater reservoirs n_gw_res = 0 for watershed in self.tree.watersheds: n = len(watershed.groundwater.reservoirs) if n > n_gw_res: n_gw_res = n df = pd.DataFrame() # gather budget variables into a single dataframe for variable, id_ in variables2.items(): if variable in vector_variable: for k in range(0, n_gw_res): df = pd.concat( [ df, self._get_vector_variable( f'{variable}#{k}', k, id_, header_type ) ], axis=1 ) else: df = pd.concat( [df, self._get_variable(variable, id_, header_type)], axis=1 ) return df.reorder_levels(['watersheds', 'variables'], axis=1) def _metrics_to_dataframe( self, variable, start_date, end_date, header_type ): nwatershed = len(self.tree.watersheds) dates = [ self._get_date(d.getDatetime()) for d in self._m.getOutputs().getDates() ] istart = dates.index(start_date) + 1 iend = dates.index(end_date) + 1 data = self._m.getMetrics(nwatershed, istart, iend) columns = self._set_columns(header_type) if variable == "riverflow": # First ten values corresponds to riverflow index = self._metrics_riv_keys_meths data = data[:, :10].T elif variable == "watertable": index = self._metrics_gw_keys_meths # Last value corresponds to watertable (only nse) data = data[:, -1:].T df = pd.DataFrame(data, index=index, columns=columns) df.index.name = "metrics" df.columns.name = "watersheds" df = df.where(df < 1e+20, np.nan) return df def _create_directory(self, path): os.makedirs(path, exist_ok=True)
[docs] def get_input( self, variable: Literal[ "rainfall", "pet", "snow", "temperature", "riverobs", "groundwaterobs", "riverpumping", "groundwaterpumping" ] = 'rainfall', header_type: Literal["id", "name"] = "name" ) -> pd.DataFrame: """Get simulation input data. Parameters ---------- variable: `str`, optional The simulation input variable to retrieve. ======================== ======================================= variable description ======================== ======================================= ``'rainfall'`` The simulation input rainfall data. ``'pet'`` The simulation input |PET| data. ``'snow'`` The simulation input snow data. ``'temperature'`` The simulation input temperature data. ``'riverobs'`` The river flow observation data. ``'groundwaterobs'`` The groundwater level observation data. ``'riverpumping'`` The river pumping data. ``'groundwaterpumping'`` The groundwater pumping data. ======================== ======================================= header_type: `str`, optional The header type of the returned `pandas.DataFrame`. ============ ========================================== header_type description ============ ========================================== ``'id'`` The header corresponds to the watershed identifiers as they were provided to build the `Tree`. ``'name'`` The header corresponds to the watershed names. ============ ========================================== Returns ------- `pandas.DataFrame` """ _check_literal( variable, [ "rainfall", "pet", "snow", "temperature", "riverobs", "groundwaterobs", "riverpumping", "groundwaterpumping" ] ) _check_literal(header_type, ["id", "name"]) return self._input_to_dataframe( getattr(self._inputs, variable), header_type, True )
[docs] def get_output( self, variable: Literal["riverflow", "watertable"] = 'riverflow', header_type: Literal["id", "name"] = "name" ) -> pd.DataFrame: """Get a given output variable. Parameters ---------- variable: `str`, optional Which output variable to return as a dataframe. The options are *riverflow* or *watertable*. By default, *riverflow* is returned. header_type: `str`, optional Whether to use the watershed identifiers or names as dataframe header. Returns ------- `pandas.DataFrame` """ _check_literal(variable, ["riverflow", "watertable"]) _check_literal(header_type, ["id", "name"]) id_ = self._variables_ids['output'][variable] return self._output_to_dataframe( self._m.getOutputs().getVariable(id_).getData(), header_type )
[docs] def get_riverflow_component( self, variables: Optional[Union[str, List[str]]] = None, header_type: Literal["id", "name"] = "name", ) -> pd.DataFrame: """Get the components of the river flow (m3/s) Parameters ---------- header_type: `str`, optional Whether to use the watershed identifiers or names as dataframe header. variables: `str`, optional The components of the river flow to retrieve. Default is None, meaning all the components. ======================== ======================================= variable description ======================== ======================================= ``'runoff'`` The runoff component. ``'runoff_overflow'`` The runoff overflow component. ``'exchange'`` The exchange component. It corresponds to the exchange_riverflow parameter value. ``'pumping'`` The pumping/injection component. ``'baseflow'`` The baseflow component. ``'gw_exchange'`` The groundwater exchange component. It is related to the groundwater.#.exchanges parameter value. ``'gw_overflow'`` The groundwater overflow component. ``'deficit'`` The deficit component. It corresponds the water quantity that couldn't been taken from the river. ======================== ======================================= Returns ------- `pandas.DataFrame` """ if self.output_settings.budget_riverflow: return self._get_budget( variables, header_type, "budget_riverflow", [ 'baseflow', 'gw_exchange', 'gw_overflow' ] ) else: return pd.DataFrame({})
[docs] def get_budget( self, variables: Optional[List[str]] = None, header_type: Literal["id", "name"] = "name", ) -> pd.DataFrame: """Get the water balance of the reservoirs (mm) Parameters ---------- variables: `str`, optional header_type: `str`, optional Whether to use the watershed identifiers or names as dataframe header. Returns ------- `pandas.DataFrame` """ return self._get_budget( variables, header_type, "budget", [ 'baseflow', 'drainage', 'exchanges_flow', 'groundwater_overflow', 'groundwater_state' ] )
def _get_variable(self, var, id_, header_type): d = self._output_to_dataframe( self._m.getOutputs().getVariable(id_).getData(), header_type ) # turn variable dataframe columns into multiindex d = pd.concat( [d], keys=[var], names=['variables'], axis=1 ) return d def _get_vector_variable(self, var, res, id_, header_type): d = self._output_to_dataframe( self._m.getOutputs().getVectorVariable(id_)[res].getData(), header_type ) # turn variable dataframe columns into multiindex d = pd.concat( [d], keys=[var], names=['variables'], axis=1 ) return d
[docs] def get_metrics( self, variable: Literal["riverflow", "watertable"] = 'riverflow', header_type: Literal["id", "name"] = "name", start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None, opti: bool = False ) -> pd.DataFrame: """Get the metrics. Parameters ---------- variable: `str`, optional The variable type for which metrics are computed. Possible values are 'riverflow' and 'watertable'. header_type: `str`, optional Whether to use the watershed identifiers or names as dataframe header. start_date: `datetime.datetime`, optional Starting date of the temporal period over which metrics will be computed. If None, it is the simulation starting date. Default is None. end_date: `datetime.datetime`, optional Ending date of the temporal period over which metrics will be computed. If None, it is the simulation ending date. Default is None. opti: `bool`, optional Whether to retrieve metrics computed over the optimization period. If True, `start_date` and `end_date` keyword arguments will be ignored. Default to False. Returns ------- `pandas.DataFrame` """ _check_literal(variable, ["riverflow", "watertable"]) _check_literal(header_type, ["id", "name"]) if start_date is None: start_date = self.simulation_settings.starting_date if end_date is None: end_date = self.simulation_settings.ending_date if opti: start_date = self.optimization_settings.starting_date end_date = self.optimization_settings.ending_date return self._metrics_to_dataframe( variable, start_date, end_date, header_type )
[docs] def run(self, start: int, end: int, update_param: bool = False) -> None: """Run simulation from `start` to `end` time steps. Parameters ---------- start: `int` Starting timestep. end: `int` Ending timestep. update_param: `bool`. Reinitialise the simulation parameters """ if start > end: raise ValueError("Starting time step greater than ending timestep") if end > self._ntimestep: raise ValueError("Ending timestep exceed the total number of timestep") self._m.run(start, end, update_param)
[docs] @wrap_property(StatesCollection) def get_states(self, timestep: int) -> StatesCollection: """Get the simulation states for `timestep`. Parameters ---------- timestep: `int` Timestep for which to get simulation states. Returns ------- `~rameau.core.states.StatesCollection` """ return self._m.getStates(timestep)
[docs] @wrap_property(StatesCollection) def set_states( self, states: StatesCollection, timestep: int, only_levels: bool = False ) -> None: """Set the simulation states for `timestep`. Parameters ---------- states: `~rameau.core.states.StatesCollection` Simulation states to apply. timestep: `int` Timestep for which to set the simulation states. only_levels: `bool` Whether to update only reservoir levels. """ self._m.setStates(states._m, timestep, only_levels)
@property @wrap_property(StatesCollection) def final_states(self) -> StatesCollection: """ Simulation final states.""" return self._m.getOutputs().getFinalStates()
[docs] def to_toml(self, path: str) -> None: """ Dump the simulation to a TOML file. Parameters ---------- path: `str` TOML file path. """ err = self._m.to_toml( path, self.optimization_settings._m, self._forecast_settings._m ) if err.getInt(0) != 0: raise RuntimeError(err.getString(0))
def _prepare_write_output(self, path, kwargs): path = f"{path}/outputs-{self.simulation_settings.name}" self._create_directory(path) kwargs2 = {} for key in OutputSettings._computed_attributes: if key in kwargs: kwargs2[key] = kwargs[key] else: kwargs2[key] = getattr(self.output_settings, key) output_opt = OutputSettings(**kwargs2) return path, output_opt
[docs] def write_outputs( self, path: str = '.', **kwargs ) -> None: """ Dump the simulation outputs to a directory. Additional keyword arguments are `~rameau.core.settings.OutputSettings` properties. Parameters ---------- path: `str`, optional Output directory of csv files. Default to current directory. kwargs: `dict`, optional Keywords related to the output types """ path, output_opt = self._prepare_write_output(path, kwargs) home = os.getcwd() os.chdir(path) err = self._m.write_outputs( ".", self.optimization_settings._m, self._forecast_settings._m, output_opt._m ) os.chdir(home) if err.getInt(0) != 0: raise RuntimeError(err.getString(0))
[docs] class OptiSimulation(Simulation): """rameau optimization simulation.""" _c_class = COptiSimulation optimization_settings : OptimizationSettings = _GetDerivedTypeDecriptor( 4, OptimizationSettings ) # type: ignore def __init__(self) -> None: super().__init__()
[docs] def get_opti_metrics( self, variable: Literal["riverflow", "watertable"] = 'riverflow', header_type: Literal["id", "name"] = "name" ) -> pd.DataFrame: """Get the metrics computed over the optimization time period. .. version-deprecated:: 0.3.0 Use `Simulation.get_metrics` with keyword `opti=True` instead. """ warnings.warn( "`get_opti_metrics` is deprecated. Use `get_metrics` instead.", DeprecationWarning, ) return self.get_metrics(variable, header_type, opti=True)
[docs] def to_toml(self, path: str) -> None: """This method overrides `Simulation.to_toml`""" self._m.to_toml( path, self.optimization_settings._m, self._forecast_settings._m )
[docs] def write_outputs(self, path: str = '.', **kwargs) -> None: """This method overrides `Simulation.write_outputs`""" path, output_opt = self._prepare_write_output(path, kwargs) home = os.getcwd() os.chdir(path) err = self._m.write_outputs( ".", self.optimization_settings._m, self._forecast_settings._m, output_opt._m ) os.chdir(home) if err.getInt(0) != 0: raise RuntimeError(err.getString(0))
[docs] class ForecastSimulation(OptiSimulation): """rameau forecast simulation.""" _c_class = CForecastSimulation _forecast_variables_ids = { 'output': { 'riverflow': 0, 'watertable': 1, } } forecast_settings: ForecastSettings = _GetDerivedTypeDecriptor( 3, ForecastSettings ) # type: ignore def __init__(self) -> None: self._m = CForecastSimulation()
[docs] def get_forecast_output( self, variable: Literal["riverflow", "watertable"] = 'riverflow', header_type: Literal["id", "name"] = "name" ) -> pd.DataFrame: """Get a given output variable. Parameters ---------- variable: `str`, optional Which output variable to return as a dataframe. The options are *riverflow* or *watertable*. By default, *riverflow* is returned. header_type: `str`, optional Whether to use the watershed identifiers or names as dataframe header. Returns ------- `pandas.DataFrame` """ _check_literal(variable, ["riverflow", "watertable"]) _check_literal(header_type, ["id", "name"]) columns = self._set_columns(header_type) df2 = {} j = 0 outputs = self._m.getForecastOutputs().getVariable( self._forecast_variables_ids['output'][variable] ) for i, output in enumerate(outputs): df = pd.DataFrame( output.getData(), index=[ self._get_date(d.getDatetime()) for d in self._m.getForecastOutputs().getDates() ], columns=columns ) df.index.name = "dates" df.columns.name = "watersheds" if self.forecast_settings.norain and i == 0: df2['norain'] = df else: if self.forecast_settings.quantiles_output: df2[f'{self.forecast_settings.quantiles[j]}%'] = df else: df2[self.forecast_settings.year_members[j]] = df j = j + 1 if df2: df2 = pd.concat(df2, axis=1, names=['member']) return df2 else: return pd.DataFrame()
[docs] def get_output( self, variable: Literal["riverflow", "watertable"] = 'riverflow', header_type: Literal["id", "name"] = "name" ) -> pd.DataFrame: """This method overrides `Simulation.get_output`""" _check_literal(variable, ["riverflow", "watertable"]) _check_literal(header_type, ["id", "name"]) emission_date = self.forecast_settings.emission_date data = self._output_to_dataframe( self._m.getOutputs() .getVariable(self._variables_ids['output'][variable]) .getData(), header_type ) return data.loc[:emission_date]
[docs] def to_toml(self, path: str) -> None: """This method overrides `Simulation.to_toml`""" self._m.to_toml( path, self.optimization_settings._m, self.forecast_settings._m )
[docs] def write_outputs(self, path: str = '.') -> None: """This method overrides `Simulation.write_outputs`""" path = f"{path}/outputs-{self.simulation_settings.name}" self._create_directory(path) home = os.getcwd() os.chdir(path) err = self._m.write_outputs( ".", self.optimization_settings._m, self.forecast_settings._m, self.output_settings._m ) os.chdir(home) if err.getInt(0) != 0: raise RuntimeError(err.getString(0))