# Copyright 2025, BRGM
#
# This file is part of Rameau.
#
# Rameau is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# Rameau is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
# PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Rameau. If not, see <https://www.gnu.org/licenses/>.
#
"""
Rameau simulations
"""
import os
from typing import Literal, List, Optional, Any
from collections import defaultdict
import pandas as pd
import numpy as np
from rameau.core.inputs.input import Input
from rameau.wrapper import (
CSimulation, COptiSimulation, CForecastSimulation # type: ignore
)
from rameau.core.settings import (
SimulationSettings,
ForecastSettings,
OptimizationSettings,
OutputSettings
)
from rameau.core import Tree
from rameau.core.states import StatesCollection
from rameau.core.inputs import InputCollection
from rameau.core._utils import wrap_property, _check_literal, _get_datetime
class _PointerDescriptor():
"""
Pointer descriptor.
"""
def __init__(self, id, obj):
self._id = id
self._obj = obj
def __get__(self, instance, type=None) -> Any:
b = self._obj.__new__(self._obj)
meth = getattr(instance._m, f"getPointer{self._id}")
b._m = meth(self._id)
return b
[docs]
class Simulation():
"""rameau simulations."""
_c_class = CSimulation
_metrics_riv_keys_meths = {
"nse": 0,
"kge": 1,
"kge_2012": 2,
"nse_sqrt": 3,
"kge_sqrt": 4,
"kge_2012_sqrt": 5,
"nse_log": 6,
"kge_log": 7,
"kge_2012_log": 8,
"ratio": 9,
}
_metrics_gw_keys_meths = {
"nse": 0,
}
_variables_ids = {
'output': {
'riverflow': 0,
'watertable': 1,
'riverpump_deficit': 17,
},
'budget': {
'rainfall': 2,
'snowfall': 3,
'potential_evapotranspiration': 4,
'unmet_potential_evapotranspiration': 5,
'actual_evapotranspiration': 6,
'effective_rainfall': 7,
'height_snowpack': 8,
'retention_snowpack': 9,
'height_thornthwaite_reservoir': 10,
'height_progressive_reservoir': 11,
'runoff': 12,
'runoff_overflow': 13,
'seepage': 14,
'height_soil': 15,
'riverflow_local': 16,
'baseflow': 0,
'drainage': 1,
'exchanges_flow': 2,
'groundwater_overflow': 3,
'groundwater_state': 4,
}
}
_inputs: InputCollection = _PointerDescriptor(
0, InputCollection
) # type: ignore
simulation_settings: SimulationSettings = _PointerDescriptor(
1, SimulationSettings
) # type: ignore
output_settings: OutputSettings = _PointerDescriptor(
2, OutputSettings
) # type: ignore
tree: Tree = _PointerDescriptor(
7, Tree
) # type: ignore
def __init__(self) -> None:
self._m = self._c_class()
self._optimization_settings: OptimizationSettings
self._forecast_settings: ForecastSettings
def _set_columns(self, header_type):
c = self.tree.connection
if header_type == 'id':
columns = list(c.keys())
elif header_type == 'name':
columns = [self.tree.watersheds[i].name for i in range(len(c))]
return columns
@_get_datetime
def _get_date(self, date):
return date
def _input_to_dataframe(self, data, header_type, nan_nodata=False):
columns = self._set_columns(header_type)
if np.size(data.data) == 0:
return pd.DataFrame()
df = pd.DataFrame(
data.data,
index=data.dates,
columns=columns
)
df.index.name = "dates"
df.columns.name = "watersheds"
if nan_nodata:
df = df.where(df != data.nodata, np.nan)
return df
def _output_to_dataframe(self, data, header_type):
columns = self._set_columns(header_type)
if np.size(data) == 0:
return pd.DataFrame()
df = pd.DataFrame(
data,
index=[
self._get_date(d.getDatetime())
for d in self._m.getOutputs().getDates()
],
columns=columns
)
df.index.name = "dates"
df.columns.name = "watersheds"
return df
def _metrics_to_dataframe(self, scores, columns, header_type):
if len(scores) > 0:
data = defaultdict(lambda :[])
index = self._set_columns(header_type)
for i in range(len(self.tree.connection)):
for key, value in columns.items():
data[key].append(scores[i].getFloat(value))
df = pd.DataFrame(data, index=index).T
df.index.name = "metrics"
df.columns.name = "watersheds"
df = df.where(df < 1e+20, np.nan)
return df
else:
return pd.DataFrame()
def _create_directory(self, path):
os.makedirs(path, exist_ok=True)
[docs]
def get_output(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
"""Get a given output variable.
Parameters
----------
variable: `str`, optional
Which output variable to return as a dataframe. The options
are *riverflow* or *watertable*. By default, *riverflow* is
returned.
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
Returns
-------
`pandas.DataFrame`
"""
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
id_ = self._variables_ids['output'][variable]
return self._output_to_dataframe(
self._m.getOutputs().getVariable(id_).getData(), header_type
)
[docs]
def get_budget(
self,
variables: Optional[List[str]] = None,
header_type: Literal["id", "name"] = "name",
) -> pd.DataFrame:
"""Get the water balance.
Parameters
----------
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
variables: `str`, optional
Returns
-------
`pandas.DataFrame`
"""
_check_literal(header_type, ["id", "name"])
if variables is not None:
variables2 = {
key:value for key, value in self._variables_ids["budget"].items()
if key in variables
}
else:
variables2 = self._variables_ids['budget']
# determine maximum number of groundwater reservoirs
n_gw_res = 0
for watershed in self.tree.watersheds:
n = len(watershed.groundwater.reservoirs)
if n > n_gw_res:
n_gw_res = n
df = pd.DataFrame()
# gather budget variables into a single dataframe
for variable, id_ in variables2.items():
if variable in [
'baseflow', 'drainage', 'exchanges_flow',
'groundwater_overflow', 'groundwater_state'
]:
for k in range(0, n_gw_res):
df = pd.concat(
[
df,
self._get_vector_variable(
f'{variable}#{k}', k, id_, header_type
)
],
axis=1
)
else:
df = pd.concat(
[df, self._get_variable(variable, id_, header_type)],
axis=1
)
return df.reorder_levels(['watersheds', 'variables'], axis=1)
def _get_variable(self, var, id_, header_type):
d = self._output_to_dataframe(
self._m.getOutputs().getVariable(id_).getData(),
header_type
)
# turn variable dataframe columns into multiindex
d = pd.concat(
[d], keys=[var], names=['variables'], axis=1
)
return d
def _get_vector_variable(self, var, res, id_, header_type):
d = self._output_to_dataframe(
self._m.getOutputs().getVectorVariable(id_)[res].getData(),
header_type
)
# turn variable dataframe columns into multiindex
d = pd.concat(
[d], keys=[var], names=['variables'], axis=1
)
return d
[docs]
def get_metrics(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
"""Get the metrics.
Parameters
----------
header_type: `str`, optional
Whether to use the watershed identifiers or names as
dataframe header.
Returns
-------
`pandas.DataFrame`
"""
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
if variable == 'riverflow':
return self._metrics_to_dataframe(
self._m.getOutputs().getMetricsRiverflow(),
self._metrics_riv_keys_meths,
header_type
)
elif variable == 'watertable':
return self._metrics_to_dataframe(
self._m.getOutputs().getMetricsWatertable(),
self._metrics_gw_keys_meths,
header_type
)
[docs]
def run(self, start: int, end: int, update_param: bool = False) -> None:
"""Run simulation from `start` to `end` time steps.
Parameters
----------
start: `int`
Starting timestep.
end: `int`
Ending timestep.
update_param: `bool`.
Reinitialise the simulation parameters
"""
self._m.run(start, end, update_param)
[docs]
@wrap_property(StatesCollection)
def get_states(self, timestep: int) -> StatesCollection:
"""Get the simulation states for `timestep`.
Parameters
----------
timestep: `int`
Timestep for which to get simulation states.
Returns
-------
`StatesCollection`
"""
return self._m.getStates(timestep)
[docs]
@wrap_property(StatesCollection)
def set_states(
self,
states: StatesCollection,
timestep: int,
only_levels: bool = False
) -> None:
"""Set the simulation states for `timestep`.
Parameters
----------
states: `StatesCollection`
Simulation states to apply.
timestep: `int`
Timestep for which to set the simulation states.
only_levels: `bool`
Whether to update only reservoir levels.
"""
self._m.setStates(states._m, timestep, only_levels)
@property
@wrap_property(StatesCollection)
def final_states(self) -> StatesCollection:
return self._m.getOutputs().getFinalStates()
[docs]
def to_toml(self, path: str) -> None:
"""
Dump the simulation to a TOML file.
Parameters
----------
path: `str`
TOML file path.
"""
err = self._m.to_toml(
path,
self._optimization_settings._m,
self._forecast_settings._m
)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))
def _prepare_write_output(self, path, kwargs):
path = f"{path}/outputs-{self.simulation_settings.name}"
self._create_directory(path)
kwargs2 = {}
for key in OutputSettings._computed_attributes:
if key in kwargs:
kwargs2[key] = kwargs[key]
else:
kwargs2[key] = getattr(self.output_settings, key)
output_opt = OutputSettings(**kwargs2)
return path, output_opt
[docs]
def write_outputs(
self,
path: str = '.',
**kwargs
) -> None:
"""
Dump the simulation outputs to a directory.
Additional keyword arguments are `OutputSettings` properties.
Parameters
----------
path: `str`, optional
Output directory of csv files. Default to current directory.
kwargs:
"""
path, output_opt = self._prepare_write_output(path, kwargs)
err = self._m.write_outputs(
path,
self._optimization_settings._m,
self._forecast_settings._m,
output_opt._m
)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))
[docs]
class OptiSimulation(Simulation):
"""rameau optimization simulation."""
_c_class = COptiSimulation
optimization_settings : OptimizationSettings = _PointerDescriptor(
4, OptimizationSettings
) # type: ignore
def __init__(self) -> None:
super().__init__()
[docs]
def get_opti_metrics(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
if variable == 'riverflow':
return self._metrics_to_dataframe(
self._m.getOutputs().getMetricsOptiRiverflow(),
self._metrics_riv_keys_meths,
header_type
)
elif variable == 'watertable':
return self._metrics_to_dataframe(
self._m.getOutputs().getMetricsOptiWatertable(),
self._metrics_gw_keys_meths,
header_type
)
[docs]
def to_toml(self, path: str) -> None:
"""This method overrides `Simulation.to_toml`"""
self._m.to_toml(
path,
self.optimization_settings._m,
self._forecast_settings._m
)
[docs]
def write_outputs(self, path: str = '.', **kwargs) -> None:
"""This method overrides `Simulation.write_outputs`"""
path, output_opt = self._prepare_write_output(path, kwargs)
err = self._m.write_outputs(
path,
self.optimization_settings._m,
self._forecast_settings._m,
output_opt._m
)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))
[docs]
class ForecastSimulation(OptiSimulation):
"""rameau forecast simulation."""
_c_class = CForecastSimulation
_forecast_variables_ids = {
'output': {
'riverflow': 0,
'watertable': 1,
}
}
forecast_settings: ForecastSettings = _PointerDescriptor(
3, ForecastSettings
) # type: ignore
def __init__(self) -> None:
self._m = CForecastSimulation()
[docs]
def get_forecast_output(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
columns = self._set_columns(header_type)
df2 = {}
j = 0
outputs = self._m.getForecastOutputs().getVariable(
self._forecast_variables_ids['output'][variable]
)
for i, output in enumerate(outputs):
df = pd.DataFrame(
output.getData(),
index=[
self._get_date(d.getDatetime())
for d in self._m.getForecastOutputs().getDates()
],
columns=columns
)
df.index.name = "dates"
df.columns.name = "watersheds"
if self.forecast_settings.norain and i == 0:
df2['norain'] = df
else:
if self.forecast_settings.quantiles_output:
df2[f'{self.forecast_settings.quantiles[j]}%'] = df
else:
df2[self.forecast_settings.year_members[j]] = df
j = j + 1
if df2:
df2 = pd.concat(df2, axis=1, names=['member'])
return df2
else:
return pd.DataFrame()
[docs]
def get_output(
self,
variable: Literal["riverflow", "watertable"] = 'riverflow',
header_type: Literal["id", "name"] = "name"
) -> pd.DataFrame:
_check_literal(variable, ["riverflow", "watertable"])
_check_literal(header_type, ["id", "name"])
emission_date = self.forecast_settings.emission_date
data = self._output_to_dataframe(
self._m.getOutputs()
.getVariable(self._variables_ids['output'][variable])
.getData(),
header_type
)
return data.loc[:emission_date]
[docs]
def to_toml(self, path: str) -> None:
"""This method overrides `Simulation.to_toml`"""
self._m.to_toml(
path,
self._optimization_settings._m,
self.forecast_settings._m
)
[docs]
def write_outputs(self, path: str = '.') -> None:
"""This method overrides `Simulation.write_outputs`"""
path = f"{path}/outputs-{self.simulation_settings.name}"
self._create_directory(path)
err = self._m.write_outputs(
path,
self._optimization_settings._m,
self.forecast_settings._m,
self.output_settings._m
)
if err.getInt(0) != 0:
raise RuntimeError(err.getString(0))