Skip to content

Sager - Status

OnsSagerStatus(ons)

Class used for handling ONS Sager status of limitation events. Can be accessed via ons.sager.status.

Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    ons : Ons
        Top level object carrying all functionality and the connection handler.
    """
    self._ons: e_o.Ons = ons

get(period, site_names, output_type='DataFrame')

Gets the status of limitation events for the specified period and sites.

The result has the following columns: - site_name: Name of the site (object) as in the performance database - dataOcorrenciaEvento - idFonte - nomeStatus - agente - ehAgenteFavorito - idAgente - codigoAgente - idFonte - usinaConjunto - idUsinaConjunto - idEvento - temRestricao - apresentarVisaoDetalhada - apresentarRedisponibilizar - ehConjunto

Parameters:

  • period

    (DateTimeRange) –

    The time period to retrieve data for.

  • site_names

    (list[str] | None) –

    The names of the sites to retrieve data for. This name must match the object name in the database.

    If set to None, all sites in the database will be requested.

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]

    By default "DataFrame"

Returns:

  • DataFrame | DataFrame

    DataFrame containing the status of limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.

  • ErrorSummary

    A summary of errors encountered during the retrieval process.

Source code in echo_ons/ons_sager_status.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    site_names: list[str] | None,
    output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> tuple[pd.DataFrame | pl.DataFrame, ErrorSummary]:
    """Gets the status of limitation events for the specified period and sites.

    The result has the following columns:
    - site_name: Name of the site (object) as in the performance database
    - dataOcorrenciaEvento
    - idFonte
    - nomeStatus
    - agente
    - ehAgenteFavorito
    - idAgente
    - codigoAgente
    - idFonte
    - usinaConjunto
    - idUsinaConjunto
    - idEvento
    - temRestricao
    - apresentarVisaoDetalhada
    - apresentarRedisponibilizar
    - ehConjunto

    Parameters
    ----------
    period : DateTimeRange
        The time period to retrieve data for.
    site_names : list[str] | None
        The names of the sites to retrieve data for. This name must match the object name in the database.

        If set to None, all sites in the database will be requested.
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]

        By default "DataFrame"

    Returns
    -------
    pd.DataFrame | pl.DataFrame
        DataFrame containing the status of limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
    ErrorSummary
        A summary of errors encountered during the retrieval process.
    """
    # creating error ErrorSummary
    error_summary = ErrorSummary(name="OnsSagerStatus")

    # getting the objects and attributes from the database
    ons_sites = self._ons._perfdb.objects.instances.get(  # noqa: SLF001
        object_names=site_names,
        object_types=["ons_site"],
        get_attributes=True,
        attribute_names=["ons_site_id"],
    )

    if not ons_sites:
        raise ValueError(f"No sites found with the specified names: {site_names}")

    # iterating sites
    site_list = []
    for site_name, site_data in ons_sites.items():
        if "ons_site_id" not in site_data or not site_data["ons_site_id"]:
            logger.error(f"Site '{site_name}' does not have 'ons_site_id' attribute. It will be skipped.")

            # creating error object
            error_obj = ErrorObject(
                name=site_name,
                exceptions=[ValueError(f"Site '{site_name}' does not have 'ons_site_id' attribute.")],
            )
            error_summary.add_child(error_obj)

            continue
        site_list.append(site_data["ons_site_id"])
    site_list = sorted(set(site_list))

    # iterating each page
    page = 0
    finished = False
    all_events = []
    while not finished:
        endpoint = "ConsultarEventos/obterEventosGeral"
        payload = {
            "dataOcorrenciaInicio": f"{period.start:%Y-%m-%d}T03:00:00.000Z",
            "dataOcorrenciaFim": f"{period.end:%Y-%m-%d}T03:00:00.000Z",
            "apenasAgentesFavoritos": False,
            "idsStatus": [],
            "idsFontes": [],
            "idsAgentes": [],
            "idsUsinas": [],
            "idsConjuntos": site_list,
            "ehExportacao": False,
            "pagina": page,
        }

        result = self._ons.sager.conn.post(endpoint=endpoint, json=payload)
        self._handle_http_errors(result)

        result = result.json()
        if len(result["content"]["listaObjetos"]) == 0:
            finished = True
            continue

        all_events.extend(result["content"]["listaObjetos"])
        page += 1

    df = pl.from_dicts(
        all_events,
        schema={
            "dataOcorrenciaEvento": pl.String,
            "fonte": pl.String,
            "nomeStatus": pl.String,
            "agente": pl.String,
            "ehAgenteFavorito": pl.Boolean,
            "idAgente": pl.Int32,
            "codigoAgente": pl.String,
            "idFonte": pl.Int32,
            "usinaConjunto": pl.String,
            "idUsinaConjunto": pl.Int32,
            "idEvento": pl.Int32,
            "temRestricao": pl.Boolean,
            "apresentarVisaoDetalhada": pl.Boolean,
            "apresentarRedisponibilizar": pl.Boolean,
            "ehConjunto": pl.Boolean,
        },
    )

    # converting date to datetime without time zone
    df = df.with_columns(
        [
            pl.col("dataOcorrenciaEvento").str.strptime(pl.Datetime, "%d/%m/%Y"),
        ],
    )

    # mapping idUsinaConjunto to site_name
    df = df.with_columns(
        [
            pl.col("idUsinaConjunto")
            .replace_strict(
                {v["ons_site_id"]: k for k, v in ons_sites.items()},
                default=None,
            )
            .alias("site_name"),
        ],
    )

    # sorting by usinaConjunto and dataOcorrenciaEvento
    df = df.sort(["usinaConjunto", "dataOcorrenciaEvento"])

    if output_type == "pl.DataFrame":
        return df, error_summary
    return df.to_pandas(use_pyarrow_extension_array=True), error_summary

import_database(period, site_names=None)

Imports the limitations status to the performance database

Parameters:

  • period

    (DateTimeRange) –

    The period for which the data will be retrieved.

  • site_names

    (list[str] | None, default: None ) –

    The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.

    By default None.

Returns:

  • ErrorSummary

    A summary of errors encountered during the import process.

Source code in echo_ons/ons_sager_status.py
@validate_call
def import_database(self, period: DateTimeRange, site_names: list[str] | None = None) -> ErrorSummary:
    """Imports the limitations status to the performance database

    Parameters
    ----------
    period : DateTimeRange
        The period for which the data will be retrieved.
    site_names : list[str] | None, optional
        The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.

        By default None.

    Returns
    -------
    ErrorSummary
        A summary of errors encountered during the import process.
    """
    # checking if all the requested objects exist in self._ons.sager.data_source_objects
    if site_names:
        wrong_objs = set(site_names) - set(self._ons.sager.data_source_objects)
        if wrong_objs:
            raise ValueError(f"Requested site names not connected to data source ons_sager: {wrong_objs}")
    else:
        site_names = self._ons.sager.data_source_objects

    # getting the data
    df, error_summary = self.get(period=period, site_names=site_names, output_type="pl.DataFrame")

    # we need to convert columns to object_id, date, status and has_limitations

    # first lets get the object ids
    obj_ids = self._ons._perfdb.objects.instances.get_ids(object_names=site_names)  # noqa: SLF001

    # then lets filter just the relevant columns
    df = df[["site_name", "dataOcorrenciaEvento", "nomeStatus", "temRestricao"]]

    # replacing site_name with object_id
    df = df.with_columns(
        [
            pl.col("site_name")
            .replace_strict(
                obj_ids,
                default=None,
            )
            .alias("object_id"),
        ],
    )

    # renaming columns
    df = df.rename(
        {
            "dataOcorrenciaEvento": "date",
            "nomeStatus": "status",
            "temRestricao": "has_limitations",
        },
    )

    # converting date to date (from datetime)
    df = df.with_columns(
        [
            pl.col("date").cast(pl.Date),
        ],
    )

    # dropping columns
    df = df.drop("site_name")

    # fill NULL status column with "Desconhecido"
    df = df.with_columns(
        [
            pl.when(pl.col("status").is_null()).then(pl.lit("Desconhecido")).otherwise(pl.col("status")).alias("status"),
        ],
    )

    # uploading to postgres
    with self._ons._perfdb.conn.reconnect() as conn:  # noqa: SLF001
        logger.info(f"Inserting {len(df)} rows in 'ons_data_validation'")
        conn.polars_to_sql(df=df, table_name="ons_data_validation", if_exists="update", schema="performance")

    return error_summary