Skip to content

Sager - Limitations

OnsSagerLimitations(ons)

Class used for handling ONS Sager limitation events. Can be accessed via ons.sager.limitations.

Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    ons : Ons
        Top level object carrying all functionality and the connection handler.
    """
    self._ons: e_o.Ons = ons

get(period, site_names, output_type='DataFrame')

Gets the limitation events for the specified period and sites.

The result has the following columns: - site_name: Name of the site (object) as in the performance database - ons_site_id: Id of the site in ONS systems - idEvento - horaInicial - horaFinal - descricao - razao - origem - valorDeLimitacao - errors - warnings - id

Parameters:

  • period

    (DateTimeRange) –

    The time period to retrieve data for.

  • site_names

    (list[str] | None) –

    The names of the sites to retrieve data for. This name must match the object name in the database.

    If set to None, all sites in the database will be requested.

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]

    By default "DataFrame"

Returns:

  • DataFrame | DataFrame

    DataFrame containing the limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.

  • ErrorSummary

    A summary of errors encountered during the retrieval process.

Source code in echo_ons/ons_sager_limitations.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    site_names: list[str] | None,
    output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> tuple[pd.DataFrame | pl.DataFrame, ErrorSummary]:
    """Gets the limitation events for the specified period and sites.

    The result has the following columns:
    - site_name: Name of the site (object) as in the performance database
    - ons_site_id: Id of the site in ONS systems
    - idEvento
    - horaInicial
    - horaFinal
    - descricao
    - razao
    - origem
    - valorDeLimitacao
    - errors
    - warnings
    - id

    Parameters
    ----------
    period : DateTimeRange
        The time period to retrieve data for.
    site_names : list[str] | None
        The names of the sites to retrieve data for. This name must match the object name in the database.

        If set to None, all sites in the database will be requested.
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]

        By default "DataFrame"

    Returns
    -------
    pd.DataFrame | pl.DataFrame
        DataFrame containing the limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
    ErrorSummary
        A summary of errors encountered during the retrieval process.
    """
    # creating subperiods
    subperiods = period.split_multiple(separator=timedelta(days=1), normalize=True)

    # getting the objects and attributes from the database
    ons_sites = self._ons._perfdb.objects.instances.get(  # noqa: SLF001
        object_names=site_names,
        object_types=["ons_site"],
        get_attributes=True,
        attribute_names=["ons_site_id"],
    )

    # creating error ErrorSummary
    error_summary = ErrorSummary(name="OnsSagerLimitations")

    if not ons_sites:
        raise ValueError(f"No sites found with the specified names: {site_names}")

    # iterating sites
    df_list = []
    df_schema = {
        "idEvento": pl.Int32,
        "horaInicial": pl.String,
        "horaFinal": pl.String,
        "descricao": pl.String,
        "razao": pl.String,
        "origem": pl.String,
        "valorDeLimitacao": pl.Float64,
        "errors": pl.List(pl.String),
        "warnings": pl.List(pl.String),
        "id": pl.Int32,
    }
    for site_name, site_data in ons_sites.items():
        if "ons_site_id" not in site_data or not site_data["ons_site_id"]:
            logger.error(f"Site '{site_name}' does not have 'ons_site_id' attribute. It will be skipped.")

            # creating error object
            error_obj = ErrorObject(
                name=site_name,
                exceptions=[ValueError(f"Site '{site_name}' does not have 'ons_site_id' attribute.")],
            )
            error_summary.add_child(error_obj)

            continue

        # getting site_id
        site_id = site_data["ons_site_id"]

        # iterating subperiods
        for subperiod in subperiods:
            endpoint = "ConsultarEventos/obterEventoDetalhado"  # this endpoint gets data from the historical data
            payload = {
                "dataOcorrencia": f"{subperiod.start:%Y-%m-%d}T03:00:00.000Z",
                "idUsinaConjunto": site_id,
                "ehConjunto": True,
            }

            # get data passing payload as query string parameters
            result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)

            if (
                isinstance(result.json(), list)
                and len(result.json()) == 1
                and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
            ):
                # if we failed to get from historical data, lets get for the open events (not yet consisted)
                endpoint = "ConsistenciaDoAgente/obterEvento"
                result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)

            if (
                isinstance(result.json(), list)
                and len(result.json()) == 1
                and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
            ):
                logger.warning(f"Limitation events not available for site '{site_name}' and subperiod '{subperiod}'")
                continue

            if result.status_code != 200:
                logger.error(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}")
                # creating error object
                error_obj = ErrorObject(
                    name=site_name,
                    exceptions=[
                        ValueError(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}"),
                    ],
                )
                error_summary.add_child(error_obj)
                continue

            self._handle_http_errors(result)

            result = result.json()
            if "content" not in result or "restricoes" not in result["content"]:
                continue

            df = pl.from_dicts(
                result["content"]["restricoes"],
                schema=df_schema,
            )

            # converting horaFinal and horaInicial to datetime
            # the string value only has HH:MM, we need to add the date from the subperiod
            df = df.with_columns(
                [
                    pl.col("horaInicial")
                    .str.strptime(pl.Datetime, "%H:%M")
                    .dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day),
                    pl.col("horaFinal")
                    .str.strptime(pl.Datetime, "%H:%M")
                    .dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day, second=59),
                ],
            )

            # adding the site_name and ons_site_id as columns
            df: pl.DataFrame = df.with_columns(
                [
                    pl.lit(site_name).alias("site_name"),
                    pl.lit(site_id).alias("ons_site_id"),
                ],
            )

            df_list.append(
                df,
            )

    # if df_list is empty, add an empty DataFrame
    if not df_list:
        df_list.append(pl.DataFrame(schema=df_schema | {"site_name": pl.Utf8, "ons_site_id": pl.Int64}))

    # concatenating all DataFrames
    df = pl.concat(df_list)

    # sorting by site_name and horaInicial
    df = df.sort(["site_name", "horaInicial"])

    if output_type == "pl.DataFrame":
        return df, error_summary

    return df.to_pandas(use_pyarrow_extension_array=True), error_summary

import_database(period, site_names=None)

Imports the limitations events to the performance database

Parameters:

  • period

    (DateTimeRange) –

    The period for which the data will be retrieved.

  • site_names

    (list[str] | None, default: None ) –

    The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.

    By default None.

Returns:

  • ErrorSummary

    A summary of errors encountered during the import process.

Source code in echo_ons/ons_sager_limitations.py
@validate_call
def import_database(self, period: DateTimeRange, site_names: list[str] | None = None) -> ErrorSummary:
    """Imports the limitations events to the performance database

    Parameters
    ----------
    period : DateTimeRange
        The period for which the data will be retrieved.
    site_names : list[str] | None, optional
        The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.

        By default None.

    Returns
    -------
    ErrorSummary
        A summary of errors encountered during the import process.
    """
    # checking if all the requested objects exist in self._ons.sager.data_source_objects
    if site_names:
        wrong_objs = set(site_names) - set(self._ons.sager.data_source_objects)
        if wrong_objs:
            raise ValueError(f"Requested site names not connected to data source ons_sager: {wrong_objs}")
    else:
        site_names = self._ons.sager.data_source_objects

    # getting the data
    df, error_summary = self.get(period=period, site_names=site_names, output_type="pl.DataFrame")

    if len(df) == 0:
        logger.warning("No limitations events found for the specified period and sites.")
        return error_summary

    # converting to a DataFrame with the following columns: object_name, start, end, reason, origin, limitation_value, description
    df = df.select(
        [
            pl.col("site_name").alias("object_name"),
            pl.col("horaInicial").alias("start"),
            pl.col("horaFinal").alias("end"),
            pl.col("razao").alias("reason"),
            pl.col("origem").alias("origin"),
            pl.col("valorDeLimitacao").alias("limitation_value"),
            pl.col("descricao").alias("description"),
        ],
    )
    # converting to pandas
    df = df.to_pandas(use_pyarrow_extension_array=True)

    # upload
    self._ons._perfdb.ons.limitations.insert(df=df, group_type="site", on_conflict="update")  # noqa: SLF001

    return error_summary