Skip to content

Sager - Time Series

OnsSagerTimeseries(ons)

Class used for handling ONS Sager timeseries data. Can be accessed via ons.sager.timeseries.

Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    ons : Ons
        Top level object carrying all functionality and the connection handler.
    """
    self._ons: e_o.Ons = ons

get(period, site_names, output_type='DataFrame')

Gets the timeseries data for the specified period and sites.

The result has the following columns: - site_name: Name of the site (object) as in the performance database - ons_site_id: Id of the site in ONS systems - timestamp_start - timestamp_end - disponibilidade - geracaoLimitada - geracaoReferencia - geracaoReferenciaFinal - geracaoVerificada

Parameters:

  • period

    (DateTimeRange) –

    The time period to retrieve data for.

  • site_names

    (list[str] | None) –

    The names of the sites to retrieve data for. This name must match the object name in the database.

    If set to None, all sites in the database will be requested.

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]

    By default "DataFrame"

Returns:

  • DataFrame | DataFrame

    DataFrame containing the timeseries data. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.

  • ErrorSummary

    A summary of errors encountered during the retrieval process.

Source code in echo_ons/ons_sager_timeseries.py
@validate_call
def get(
    self,
    period: DateTimeRange,
    site_names: list[str] | None,
    output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> tuple[pd.DataFrame | pl.DataFrame, ErrorSummary]:
    """Gets the timeseries data for the specified period and sites.

    The result has the following columns:
    - site_name: Name of the site (object) as in the performance database
    - ons_site_id: Id of the site in ONS systems
    - timestamp_start
    - timestamp_end
    - disponibilidade
    - geracaoLimitada
    - geracaoReferencia
    - geracaoReferenciaFinal
    - geracaoVerificada

    Parameters
    ----------
    period : DateTimeRange
        The time period to retrieve data for.
    site_names : list[str] | None
        The names of the sites to retrieve data for. This name must match the object name in the database.

        If set to None, all sites in the database will be requested.
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]

        By default "DataFrame"

    Returns
    -------
    pd.DataFrame | pl.DataFrame
        DataFrame containing the timeseries data. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
    ErrorSummary
        A summary of errors encountered during the retrieval process.
    """
    # creating subperiods
    subperiods = period.split_multiple(separator=timedelta(days=1), normalize=True)

    # getting the objects and attributes from the database
    ons_sites = self._ons._perfdb.objects.instances.get(  # noqa: SLF001
        object_names=site_names,
        object_types=["ons_site"],
        get_attributes=True,
        attribute_names=["ons_site_id"],
    )

    # creating error ErrorSummary
    error_summary = ErrorSummary(name="OnsSagerTimeseries")

    if not ons_sites:
        raise ValueError(f"No sites found with the specified names: {site_names}")

    # iterating sites
    df_list = []
    df_schema = {
        "disponibilidade": pl.Float64,
        "geracaoLimitada": pl.Float64,
        "geracaoReferencia": pl.Float64,
        "geracaoReferenciaFinal": pl.Float64,
        "geracaoVerificada": pl.Float64,
        "horario": pl.String,
    }
    for site_name, site_data in ons_sites.items():
        if "ons_site_id" not in site_data or not site_data["ons_site_id"]:
            logger.error(f"Site '{site_name}' does not have 'ons_site_id' attribute. It will be skipped.")

            # creating error object
            error_obj = ErrorObject(
                name=site_name,
                exceptions=[ValueError(f"Site '{site_name}' does not have 'ons_site_id' attribute.")],
            )
            error_summary.add_child(error_obj)

            continue

        # getting site_id
        site_id = site_data["ons_site_id"]

        # iterating subperiods
        for subperiod in subperiods:
            endpoint = "ConsultarEventos/obterEventoDetalhado"
            payload = {
                "dataOcorrencia": f"{subperiod.start:%Y-%m-%d}T03:00:00.000Z",
                "idUsinaConjunto": site_id,
                "ehConjunto": True,
            }

            # get data passing payload as query string parameters
            result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)

            if (
                isinstance(result.json(), list)
                and len(result.json()) == 1
                and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
            ):
                # if we failed to get from historical data, lets get for the open events (not yet consisted)
                endpoint = "ConsistenciaDoAgente/obterEvento"
                result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)

            if (
                isinstance(result.json(), list)
                and len(result.json()) == 1
                and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
            ):
                logger.warning(f"Event not available for site '{site_name}' and subperiod '{subperiod}'")
                continue

            if result.status_code != 200:
                logger.error(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}")
                # creating error object
                error_obj = ErrorObject(
                    name=site_name,
                    exceptions=[
                        ValueError(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}"),
                    ],
                )
                error_summary.add_child(error_obj)
                continue

            self._handle_http_errors(result)

            result = result.json()
            if "content" not in result or "horasEvento" not in result["content"]:
                continue

            power_data = result["content"]["horasEvento"]

            df = pl.from_dicts(
                power_data,
                schema=df_schema,
            )

            # horário is like 00:00 - 00:29, split it into timestamp_start and timestamp_end
            df = df.with_columns(
                [
                    (
                        pl.col("horario")
                        .str.split(" - ")
                        .list.first()
                        .str.strptime(pl.Datetime, "%H:%M")
                        .dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day)
                    ).alias("timestamp_start"),
                    (
                        pl.col("horario")
                        .str.split(" - ")
                        .list.last()
                        .str.strptime(pl.Datetime, "%H:%M")
                        .dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day, second=59)
                    ).alias("timestamp_end"),
                ],
            )

            # dropping unnecessary columns
            df = df.drop(["horario"])

            # adding the site_name and ons_site_id as columns
            df = df.with_columns(
                [
                    pl.lit(site_name).alias("site_name"),
                    pl.lit(site_id).alias("ons_site_id"),
                ],
            )

            df_list.append(
                df,
            )

    # if df_list is empty, add an empty DataFrame
    if not df_list:
        df_list.append(pl.DataFrame(schema=df_schema | {"site_name": pl.Utf8, "ons_site_id": pl.Int64}))

    # concatenating all DataFrames
    df = pl.concat(df_list)

    # sorting by site_name and timestamp_start
    df = df.sort(["site_name", "timestamp_start"])

    if output_type == "pl.DataFrame":
        return df, error_summary

    return df.to_pandas(use_pyarrow_extension_array=True), error_summary

import_database(period, site_names=None)

Imports the limitations time series to the performance database

Parameters:

  • period

    (DateTimeRange) –

    The period for which the data will be retrieved.

  • site_names

    (list[str] | None, default: None ) –

    The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.

    By default None.

Returns:

  • ErrorSummary

    A summary of errors encountered during the import process.

Source code in echo_ons/ons_sager_timeseries.py
@validate_call
def import_database(self, period: DateTimeRange, site_names: list[str] | None = None) -> ErrorSummary:
    """Imports the limitations time series to the performance database

    Parameters
    ----------
    period : DateTimeRange
        The period for which the data will be retrieved.
    site_names : list[str] | None, optional
        The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.

        By default None.

    Returns
    -------
    ErrorSummary
        A summary of errors encountered during the import process.
    """
    # checking if all the requested objects exist in self._ons.sager.data_source_objects
    if site_names:
        wrong_objs = set(site_names) - set(self._ons.sager.data_source_objects)
        if wrong_objs:
            raise ValueError(f"Requested site names not connected to data source ons_sager: {wrong_objs}")
    else:
        site_names = self._ons.sager.data_source_objects

    # getting the data
    df, error_summary = self.get(period=period, site_names=site_names, output_type="pl.DataFrame")

    # adding type hint
    df: pl.DataFrame

    if len(df) == 0:
        logger.warning("No data found for the specified period and sites.")
        return error_summary

    # getting all the features that are associated with this data source type
    features = self._ons._perfdb.features.definitions.get(object_models=["ons_site"], data_source_types=["ons_api"])  # noqa: SLF001

    # checking if all the wanted features exist in the DataFrame
    wanted_names = set(features["name_in_data_source"].dropna().to_list())
    missing_names = wanted_names - set(df.columns)

    if missing_names:
        logger.warning(f"Could not find the following names in the DataFrame returned from ONS: {missing_names}")
        # adding to error_summary
        for site in site_names:
            error_obj = ErrorObject(
                name=site,
                exceptions=[ValueError(f"Could not find the following names in the DataFrame returned from ONS: {missing_names}")],
            )
            error_summary.add_child(error_obj)

    # filtering only the wanted columns
    wanted_cols = ["timestamp_start", "site_name", *list(wanted_names)]
    df = df[wanted_cols]

    # creating dict to rename columns
    rename_dict = {
        "timestamp_start": "timestamp",
        "site_name": "object_name",
    }
    feature_rename = features.reset_index()[["name", "name_in_data_source"]].set_index("name_in_data_source")["name"].to_dict()
    rename_dict |= feature_rename

    df = df.rename(rename_dict)

    # converting to pandas
    df = df.to_pandas(use_pyarrow_extension_array=True)

    # pivoting
    df = df.pivot(columns="object_name", index=["timestamp"])
    # swap column levels
    df = df.swaplevel(axis=1)
    # rename column levels
    df.columns.names = ["object_name", "feature_name"]

    # converting index to datetime index
    df.index = pd.to_datetime(df.index)

    # uploading
    self._ons._perfdb.features.values.series.insert(df)  # noqa: SLF001

    return error_summary