Sager - Limitations¶
OnsSagerLimitations(ons)
¶
Class used for handling ONS Sager limitation events. Can be accessed via ons.sager.limitations.
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
ons : Ons
Top level object carrying all functionality and the connection handler.
"""
self._ons: e_o.Ons = ons
get(period, site_names, output_type='DataFrame')
¶
Gets the limitation events for the specified period and sites.
The result has the following columns: - site_name: Name of the site (object) as in the performance database - ons_site_id: Id of the site in ONS systems - idEvento - horaInicial - horaFinal - descricao - razao - origem - valorDeLimitacao - errors - warnings - id
Parameters:
-
(period¶DateTimeRange) –The time period to retrieve data for.
-
(site_names¶list[str] | None) –The names of the sites to retrieve data for. This name must match the object name in the database.
If set to None, all sites in the database will be requested.
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns:
-
DataFrame | DataFrame–DataFrame containing the limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
-
ErrorSummary–A summary of errors encountered during the retrieval process.
Source code in echo_ons/ons_sager_limitations.py
@validate_call
def get(
self,
period: DateTimeRange,
site_names: list[str] | None,
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> tuple[pd.DataFrame | pl.DataFrame, ErrorSummary]:
"""Gets the limitation events for the specified period and sites.
The result has the following columns:
- site_name: Name of the site (object) as in the performance database
- ons_site_id: Id of the site in ONS systems
- idEvento
- horaInicial
- horaFinal
- descricao
- razao
- origem
- valorDeLimitacao
- errors
- warnings
- id
Parameters
----------
period : DateTimeRange
The time period to retrieve data for.
site_names : list[str] | None
The names of the sites to retrieve data for. This name must match the object name in the database.
If set to None, all sites in the database will be requested.
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns
-------
pd.DataFrame | pl.DataFrame
DataFrame containing the limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
ErrorSummary
A summary of errors encountered during the retrieval process.
"""
# creating subperiods
subperiods = period.split_multiple(separator=timedelta(days=1), normalize=True)
# getting the objects and attributes from the database
ons_sites = self._ons._perfdb.objects.instances.get( # noqa: SLF001
object_names=site_names,
object_types=["ons_site"],
get_attributes=True,
attribute_names=["ons_site_id"],
)
# creating error ErrorSummary
error_summary = ErrorSummary(name="OnsSagerLimitations")
if not ons_sites:
raise ValueError(f"No sites found with the specified names: {site_names}")
# iterating sites
df_list = []
df_schema = {
"idEvento": pl.Int32,
"horaInicial": pl.String,
"horaFinal": pl.String,
"descricao": pl.String,
"razao": pl.String,
"origem": pl.String,
"valorDeLimitacao": pl.Float64,
"errors": pl.List(pl.String),
"warnings": pl.List(pl.String),
"id": pl.Int32,
}
for site_name, site_data in ons_sites.items():
if "ons_site_id" not in site_data or not site_data["ons_site_id"]:
logger.error(f"Site '{site_name}' does not have 'ons_site_id' attribute. It will be skipped.")
# creating error object
error_obj = ErrorObject(
name=site_name,
exceptions=[ValueError(f"Site '{site_name}' does not have 'ons_site_id' attribute.")],
)
error_summary.add_child(error_obj)
continue
# getting site_id
site_id = site_data["ons_site_id"]
# iterating subperiods
for subperiod in subperiods:
endpoint = "ConsultarEventos/obterEventoDetalhado" # this endpoint gets data from the historical data
payload = {
"dataOcorrencia": f"{subperiod.start:%Y-%m-%d}T03:00:00.000Z",
"idUsinaConjunto": site_id,
"ehConjunto": True,
}
# get data passing payload as query string parameters
result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)
if (
isinstance(result.json(), list)
and len(result.json()) == 1
and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
):
# if we failed to get from historical data, lets get for the open events (not yet consisted)
endpoint = "ConsistenciaDoAgente/obterEvento"
result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)
if (
isinstance(result.json(), list)
and len(result.json()) == 1
and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
):
logger.warning(f"Limitation events not available for site '{site_name}' and subperiod '{subperiod}'")
continue
if result.status_code != 200:
logger.error(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}")
# creating error object
error_obj = ErrorObject(
name=site_name,
exceptions=[
ValueError(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}"),
],
)
error_summary.add_child(error_obj)
continue
self._handle_http_errors(result)
result = result.json()
if "content" not in result or "restricoes" not in result["content"]:
continue
df = pl.from_dicts(
result["content"]["restricoes"],
schema=df_schema,
)
# converting horaFinal and horaInicial to datetime
# the string value only has HH:MM, we need to add the date from the subperiod
df = df.with_columns(
[
pl.col("horaInicial")
.str.strptime(pl.Datetime, "%H:%M")
.dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day),
pl.col("horaFinal")
.str.strptime(pl.Datetime, "%H:%M")
.dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day, second=59),
],
)
# adding the site_name and ons_site_id as columns
df: pl.DataFrame = df.with_columns(
[
pl.lit(site_name).alias("site_name"),
pl.lit(site_id).alias("ons_site_id"),
],
)
df_list.append(
df,
)
# if df_list is empty, add an empty DataFrame
if not df_list:
df_list.append(pl.DataFrame(schema=df_schema | {"site_name": pl.Utf8, "ons_site_id": pl.Int64}))
# concatenating all DataFrames
df = pl.concat(df_list)
# sorting by site_name and horaInicial
df = df.sort(["site_name", "horaInicial"])
if output_type == "pl.DataFrame":
return df, error_summary
return df.to_pandas(use_pyarrow_extension_array=True), error_summary
import_database(period, site_names=None)
¶
Imports the limitations events to the performance database
Parameters:
-
(period¶DateTimeRange) –The period for which the data will be retrieved.
-
(site_names¶list[str] | None, default:None) –The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.
By default None.
Returns:
-
ErrorSummary–A summary of errors encountered during the import process.
Source code in echo_ons/ons_sager_limitations.py
@validate_call
def import_database(self, period: DateTimeRange, site_names: list[str] | None = None) -> ErrorSummary:
"""Imports the limitations events to the performance database
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
site_names : list[str] | None, optional
The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.
By default None.
Returns
-------
ErrorSummary
A summary of errors encountered during the import process.
"""
# checking if all the requested objects exist in self._ons.sager.data_source_objects
if site_names:
wrong_objs = set(site_names) - set(self._ons.sager.data_source_objects)
if wrong_objs:
raise ValueError(f"Requested site names not connected to data source ons_sager: {wrong_objs}")
else:
site_names = self._ons.sager.data_source_objects
# getting the data
df, error_summary = self.get(period=period, site_names=site_names, output_type="pl.DataFrame")
if len(df) == 0:
logger.warning("No limitations events found for the specified period and sites.")
return error_summary
# converting to a DataFrame with the following columns: object_name, start, end, reason, origin, limitation_value, description
df = df.select(
[
pl.col("site_name").alias("object_name"),
pl.col("horaInicial").alias("start"),
pl.col("horaFinal").alias("end"),
pl.col("razao").alias("reason"),
pl.col("origem").alias("origin"),
pl.col("valorDeLimitacao").alias("limitation_value"),
pl.col("descricao").alias("description"),
],
)
# converting to pandas
df = df.to_pandas(use_pyarrow_extension_array=True)
# upload
self._ons._perfdb.ons.limitations.insert(df=df, group_type="site", on_conflict="update") # noqa: SLF001
return error_summary