# Copyright 2025, BRGM
#
# This file is part of Rameau.
#
# Rameau is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rameau is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Rameau. If not, see <https://www.gnu.org/licenses/>.
#
"""
Rameau simulations
"""
import os
import warnings
from typing import Literal, List, Optional, Union
import pandas as pd
import numpy as np
import datetime
from rameau.wrapper import (
CSimulation, COptiSimulation, CForecastSimulation # type: ignore
)
from rameau.core.settings import (
SimulationSettings,
ForecastSettings,
OptimizationSettings,
OutputSettings
)
from rameau.core import Tree
from rameau.core.states import StatesCollection
from rameau.core.inputs import InputCollection
from rameau.core._utils import wrap_property, _check_literal, _get_datetime
from rameau.core._descriptor import _GetDerivedTypeDecriptor, _IntDescriptor
[docs]
class Simulation():
"""rameau simulations."""
_c_class = CSimulation
_metrics_riv_keys_meths = [
"nse", "kge", "kge_2012",
"nse_sqrt", "kge_sqrt", "kge_2012_sqrt",
"nse_log", "kge_log", "kge_2012_log",
"ratio"
]
_metrics_gw_keys_meths = [ "nse" ]
_variables_ids = {
'output': {
'riverflow': 0,
'watertable': 1,
},
'budget_riverflow': {
'deficit': 18,
'runoff': 19,
'runoff_overflow': 20,
'exchange': 21,
'pumping': 22,
'baseflow': 5,
'gw_overflow': 6,
'gw_exchange': 7
},
'budget': {
'rainfall': 2,
'snowfall': 3,
'potential_evapotranspiration': 4,
'unmet_potential_evapotranspiration': 5,
'actual_evapotranspiration': 6,
'effective_rainfall': 7,
'height_snowpack': 8,
'retention_snowpack': 9,
'height_thornthwaite_reservoir': 10,
'height_progressive_reservoir': 11,
'runoff': 12,
'runoff_overflow': 13,
'seepage': 14,
'height_soil': 15,
'riverflow_local': 16,
'gw_unmet_pumping': 17,
'baseflow': 0,
'drainage': 1,
'exchanges_flow': 2,
'groundwater_overflow': 3,
'groundwater_state': 4,
}
}
_inputs: InputCollection = _GetDerivedTypeDecriptor(
0, InputCollection
) # type: ignore
simulation_settings: SimulationSettings = _GetDerivedTypeDecriptor(
0, SimulationSettings
) # type: ignore
output_settings: OutputSettings = _GetDerivedTypeDecriptor(
0, OutputSettings
) # type: ignore
tree: Tree = _GetDerivedTypeDecriptor(
0, Tree
) # type: ignore
_ntimestep: int = _IntDescriptor(0) #type: ignore
def __init__(self) -> None:
self._m = self._c_class()
self.optimization_settings: OptimizationSettings
self._forecast_settings: ForecastSettings
def _set_columns(self, header_type):
c = self.tree.connection
if header_type == 'id':
columns = list(c.keys())
elif header_type == 'name':
columns = [self.tree.watersheds[i].name for i in range(len(c))]
return columns
@_get_datetime
def _get_date(self, date):
return date
def _input_to_dataframe(self, data, header_type, nan_nodata=False):
columns = self._set_columns(header_type)
if np.size(data.data) == 0:
return pd.DataFrame()
df = pd.DataFrame(
data.data,
index=data.dates,
columns=columns
)
df.index.name = "dates"
df.columns.name = "watersheds"
if nan_nodata:
df = df.where(df != data.nodata, np.nan)
return df.loc[
self.simulation_settings.starting_date:
self.simulation_settings.ending_date, :
]
def _output_to_dataframe(self, data, header_type):
columns = self._set_columns(header_type)
if np.size(data) == 0:
return pd.DataFrame()
df = pd.DataFrame(
data,
index=[
self._get_date(d.getDatetime())
for d in self._m.getOutputs().getDates()
],
columns=columns
)
df.index.name = "dates"
df.columns.name = "watersheds"
return df.loc[
self.simulation_settings.starting_date:
self.simulation_settings.ending_date,:
]
def _get_budget(
self,
variables,
header_type,
budget_type,
vector_variable
):
_check_literal(header_type, ["id", "name"])
if variables is not None:
if isinstance(variables, str):
variables = [variables]
variables2 = {
key:value for key, value
in self._variables_ids[budget_type].items()
if key in variables
}
else:
variables2 = self._variables_ids[budget_type]
# determine maximum number of groundwater reservoirs
n_gw_res = 0
for watershed in self.tree.watersheds:
n = len(watershed.groundwater.reservoirs)
if n > n_gw_res:
n_gw_res = n
df = pd.DataFrame()
# gather budget variables into a single dataframe
for variable, id_ in variables2.items():
if variable in vector_variable:
for k in range(0, n_gw_res):
df = pd.concat(
[
df,
self._get_vector_variable(
f'{variable}#{k}', k, id_, header_type
)
],
axis=1
)
else:
df = pd.concat(
[df, self._get_variable(variable, id_, header_type)],
axis=1
)
return df.reorder_levels(['watersheds', 'variables'], axis=1)
def _metrics_to_dataframe(
self, variable, start_date, end_date, header_type
):
nwatershed = len(self.tree.watersheds)
dates = [
self._get_date(d.getDatetime())
for d in self._m.getOutputs().getDates()
]
istart = dates.index(start_date) + 1
iend = dates.index(end_date) + 1
data = self._m.getMetrics(nwatershed, istart, iend)
columns = self._set_columns(header_type)
if variable == "riverflow":
# First ten values corresponds to riverflow
index = self._metrics_riv_keys_meths
data = data[:, :10].T
elif variable == "watertable":
index = self._metrics_gw_keys_meths
# Last value corresponds to watertable (only nse)
data = data[:, -1:].T
df = pd.DataFrame(data, index=index, columns=columns)
df.index.name = "metrics"
df.columns.name = "watersheds"
df = df.where(df < 1e+20, np.nan)
return df
def _create_directory(self, path):
os.makedirs(path, exist_ok=True)
[docs]
def get_output(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
"""Get a given output variable.
Parameters
----------
variable: `str`, optional
Which output variable to return as a dataframe. The options
are *riverflow* or *watertable*. By default, *riverflow* is
returned.
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
Returns
-------
`pandas.DataFrame`
"""
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
id_ = self._variables_ids['output'][variable]
return self._output_to_dataframe(
self._m.getOutputs().getVariable(id_).getData(), header_type
)
[docs]
def get_riverflow_component(
self,
variables: Optional[Union[str, List[str]]] = None,
header_type: Literal["id", "name"] = "name",
) -> pd.DataFrame:
"""Get the components of the river flow (m3/s)
Parameters
----------
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
variables: `str`, optional
The components of the river flow to retrieve. Default is None,
meaning all the components.
======================== =======================================
variable description
======================== =======================================
``'runoff'`` The runoff component.
``'runoff_overflow'`` The runoff overflow component.
``'exchange'`` The exchange component. It corresponds to
the exchange_riverflow parameter value.
``'pumping'`` The pumping/injection component.
``'baseflow'`` The baseflow component.
``'gw_exchange'`` The groundwater exchange component. It
is related to the groundwater.#.exchanges
parameter value.
``'gw_overflow'`` The groundwater overflow component.
``'deficit'`` The deficit component. It corresponds
the water quantity that couldn't been
taken from the river.
======================== =======================================
Returns
-------
`pandas.DataFrame`
"""
if self.output_settings.budget_riverflow:
return self._get_budget(
variables,
header_type,
"budget_riverflow",
[
'baseflow', 'gw_exchange', 'gw_overflow'
]
)
else:
return pd.DataFrame({})
[docs]
def get_budget(
self,
variables: Optional[List[str]] = None,
header_type: Literal["id", "name"] = "name",
) -> pd.DataFrame:
"""Get the water balance of the reservoirs (mm)
Parameters
----------
variables: `str`, optional
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
Returns
-------
`pandas.DataFrame`
"""
return self._get_budget(
variables,
header_type,
"budget",
[
'baseflow', 'drainage', 'exchanges_flow',
'groundwater_overflow', 'groundwater_state'
]
)
def _get_variable(self, var, id_, header_type):
d = self._output_to_dataframe(
self._m.getOutputs().getVariable(id_).getData(),
header_type
)
# turn variable dataframe columns into multiindex
d = pd.concat(
[d], keys=[var], names=['variables'], axis=1
)
return d
def _get_vector_variable(self, var, res, id_, header_type):
d = self._output_to_dataframe(
self._m.getOutputs().getVectorVariable(id_)[res].getData(),
header_type
)
# turn variable dataframe columns into multiindex
d = pd.concat(
[d], keys=[var], names=['variables'], axis=1
)
return d
[docs]
def get_metrics(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name",
start_date: Optional[datetime.datetime] = None,
end_date: Optional[datetime.datetime] = None,
opti: bool = False
) -> pd.DataFrame:
"""Get the metrics.
Parameters
----------
variable: `str`, optional
The variable type for which metrics are computed. Possible values
are 'riverflow' and 'watertable'.
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
start_date: `datetime.datetime`, optional
Starting date of the temporal period over which
metrics will be computed. If None, it is the simulation
starting date. Default is None.
end_date: `datetime.datetime`, optional
Ending date of the temporal period over which
metrics will be computed. If None, it is the simulation
ending date. Default is None.
opti: `bool`, optional
Whether to retrieve metrics computed over
the optimization period. If True, `start_date`
and `end_date` keyword arguments will be ignored.
Default to False.
Returns
-------
`pandas.DataFrame`
"""
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
if start_date is None:
start_date = self.simulation_settings.starting_date
if end_date is None:
end_date = self.simulation_settings.ending_date
if opti:
start_date = self.optimization_settings.starting_date
end_date = self.optimization_settings.ending_date
return self._metrics_to_dataframe(
variable, start_date, end_date, header_type
)
[docs]
def run(self, start: int, end: int, update_param: bool = False) -> None:
"""Run simulation from `start` to `end` time steps.
Parameters
----------
start: `int`
Starting timestep.
end: `int`
Ending timestep.
update_param: `bool`.
Reinitialise the simulation parameters
"""
if start > end:
raise ValueError("Starting time step greater than ending timestep")
if end > self._ntimestep:
raise ValueError("Ending timestep exceed the total number of timestep")
self._m.run(start, end, update_param)
[docs]
@wrap_property(StatesCollection)
def get_states(self, timestep: int) -> StatesCollection:
"""Get the simulation states for `timestep`.
Parameters
----------
timestep: `int`
Timestep for which to get simulation states.
Returns
-------
`~rameau.core.states.StatesCollection`
"""
return self._m.getStates(timestep)
[docs]
@wrap_property(StatesCollection)
def set_states(
self,
states: StatesCollection,
timestep: int,
only_levels: bool = False
) -> None:
"""Set the simulation states for `timestep`.
Parameters
----------
states: `~rameau.core.states.StatesCollection`
Simulation states to apply.
timestep: `int`
Timestep for which to set the simulation states.
only_levels: `bool`
Whether to update only reservoir levels.
"""
self._m.setStates(states._m, timestep, only_levels)
@property
@wrap_property(StatesCollection)
def final_states(self) -> StatesCollection:
""" Simulation final states."""
return self._m.getOutputs().getFinalStates()
[docs]
def to_toml(self, path: str) -> None:
"""
Dump the simulation to a TOML file.
Parameters
----------
path: `str`
TOML file path.
"""
err = self._m.to_toml(
path,
self.optimization_settings._m,
self._forecast_settings._m
)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))
def _prepare_write_output(self, path, kwargs):
path = f"{path}/outputs-{self.simulation_settings.name}"
self._create_directory(path)
kwargs2 = {}
for key in OutputSettings._computed_attributes:
if key in kwargs:
kwargs2[key] = kwargs[key]
else:
kwargs2[key] = getattr(self.output_settings, key)
output_opt = OutputSettings(**kwargs2)
return path, output_opt
[docs]
def write_outputs(
self,
path: str = '.',
**kwargs
) -> None:
"""
Dump the simulation outputs to a directory.
Additional keyword arguments are `~rameau.core.settings.OutputSettings`
properties.
Parameters
----------
path: `str`, optional
Output directory of csv files. Default to current directory.
kwargs: `dict`, optional
Keywords related to the output types
"""
path, output_opt = self._prepare_write_output(path, kwargs)
home = os.getcwd()
os.chdir(path)
err = self._m.write_outputs(
".",
self.optimization_settings._m,
self._forecast_settings._m,
output_opt._m
)
os.chdir(home)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))
[docs]
class OptiSimulation(Simulation):
"""rameau optimization simulation."""
_c_class = COptiSimulation
optimization_settings : OptimizationSettings = _GetDerivedTypeDecriptor(
4, OptimizationSettings
) # type: ignore
def __init__(self) -> None:
super().__init__()
[docs]
def get_opti_metrics(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
"""Get the metrics computed over the optimization time period.
.. version-deprecated:: 0.3.0
Use `Simulation.get_metrics` with keyword `opti=True` instead.
"""
warnings.warn(
"`get_opti_metrics` is deprecated. Use `get_metrics` instead.",
DeprecationWarning,
)
return self.get_metrics(variable, header_type, opti=True)
[docs]
def to_toml(self, path: str) -> None:
"""This method overrides `Simulation.to_toml`"""
self._m.to_toml(
path,
self.optimization_settings._m,
self._forecast_settings._m
)
[docs]
def write_outputs(self, path: str = '.', **kwargs) -> None:
"""This method overrides `Simulation.write_outputs`"""
path, output_opt = self._prepare_write_output(path, kwargs)
home = os.getcwd()
os.chdir(path)
err = self._m.write_outputs(
".",
self.optimization_settings._m,
self._forecast_settings._m,
output_opt._m
)
os.chdir(home)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))
[docs]
class ForecastSimulation(OptiSimulation):
"""rameau forecast simulation."""
_c_class = CForecastSimulation
_forecast_variables_ids = {
'output': {
'riverflow': 0,
'watertable': 1,
}
}
forecast_settings: ForecastSettings = _GetDerivedTypeDecriptor(
3, ForecastSettings
) # type: ignore
def __init__(self) -> None:
self._m = CForecastSimulation()
[docs]
def get_forecast_output(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
"""Get a given output variable.
Parameters
----------
variable: `str`, optional
Which output variable to return as a dataframe. The options
are *riverflow* or *watertable*. By default, *riverflow* is
returned.
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
Returns
-------
`pandas.DataFrame`
"""
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
columns = self._set_columns(header_type)
df2 = {}
j = 0
outputs = self._m.getForecastOutputs().getVariable(
self._forecast_variables_ids['output'][variable]
)
for i, output in enumerate(outputs):
df = pd.DataFrame(
output.getData(),
index=[
self._get_date(d.getDatetime())
for d in self._m.getForecastOutputs().getDates()
],
columns=columns
)
df.index.name = "dates"
df.columns.name = "watersheds"
if self.forecast_settings.norain and i == 0:
df2['norain'] = df
else:
if self.forecast_settings.quantiles_output:
df2[f'{self.forecast_settings.quantiles[j]}%'] = df
else:
df2[self.forecast_settings.year_members[j]] = df
j = j + 1
if df2:
df2 = pd.concat(df2, axis=1, names=['member'])
return df2
else:
return pd.DataFrame()
[docs]
def get_output(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
"""This method overrides `Simulation.get_output`"""
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
emission_date = self.forecast_settings.emission_date
data = self._output_to_dataframe(
self._m.getOutputs()
.getVariable(self._variables_ids['output'][variable])
.getData(),
header_type
)
return data.loc[:emission_date]
[docs]
def to_toml(self, path: str) -> None:
"""This method overrides `Simulation.to_toml`"""
self._m.to_toml(
path,
self.optimization_settings._m,
self.forecast_settings._m
)
[docs]
def write_outputs(self, path: str = '.') -> None:
"""This method overrides `Simulation.write_outputs`"""
path = f"{path}/outputs-{self.simulation_settings.name}"
self._create_directory(path)
home = os.getcwd()
os.chdir(path)
err = self._m.write_outputs(
".",
self.optimization_settings._m,
self.forecast_settings._m,
self.output_settings._m
)
os.chdir(home)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))