Skip to content

Open Data - Sites

OnsOpenDataSites(ons)

Class used to get the sites from ONS Open Data. Can be accessed via ons.opendata.sites.

The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

Parameters:

  • ons

    (Ons) –

    Top level object carrying all functionality.

Source code in echo_ons/ons_opendata_sites.py
def __init__(self, ons: e_o.Ons) -> None:
    """Class used to get the sites from ONS Open Data. Can be accessed via `ons.opendata.sites`.

    Parameters
    ----------
    ons : Ons
        Top level object carrying all functionality.
    """
    super().__init__(ons)

    # * subclasses

    self.limitations = OnsOpenDataSiteLimitations(ons)

get(output_type='DataFrame')

Gets all the sites from ONS Open Data.

The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

Parameters:

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".

Returns:

  • DataFrame

    DataFrame containing the sites from ONS Open Data. The following columns are available:

    • id_subsistema
    • nom_subsistema
    • estad_id
    • nom_estado
    • id_ons_conjunto
    • nom_conjunto
Source code in echo_ons/ons_opendata_sites.py
@validate_call
def get(self, output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame") -> pd.DataFrame:
    """Gets all the sites from ONS Open Data.

    The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

    Parameters
    ----------
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".

    Returns
    -------
    pd.DataFrame
        DataFrame containing the sites from ONS Open Data. The following columns are available:

        - id_subsistema
        - nom_subsistema
        - estad_id
        - nom_estado
        - id_ons_conjunto
        - nom_conjunto

    """
    df = self._get_ons_file()

    # first lets drop duplicates only based on id_ons_conjunto and nom_conjunto
    df = df.unique(subset=["id_ons_conjunto", "nom_conjunto"])

    # now in case there are duplicates lets add 2, 3, ... to the "nom_conjunto" column
    df = df.with_columns(
        (
            pl.col("nom_conjunto").cast(pl.Utf8)
            + pl.col("nom_conjunto")
            .cum_count()
            .over("nom_conjunto")
            .map_elements(
                lambda x: "" if x == 1 else f"_{x}",
                return_dtype=pl.Utf8,
            )
        ).alias("nom_conjunto"),
    )

    # removing unwanted columns
    df = df.drop(["id_tipousina", "id_ons_usina", "nom_usina", "ceg", "dat_iniciorelacionamento", "dat_fimrelacionamento"])

    # dropping duplicates
    df = df.unique()

    if output_type == "DataFrame":
        return df.to_pandas(use_pyarrow_extension_array=True)

    return df

Gets all the related SPEs from ONS Open Data.

The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

Parameters:

  • (Literal['DataFrame', 'pl.DataFrame'], default: 'DataFrame' ) –

    Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".

Returns:

  • DataFrame

    DataFrame containing the related SPEs from ONS Open Data. The following columns are available:

    • site_key
    • site_name
    • spe_key (also know as CEG in ANEEL data)
    • spe_name
    • relation_start
    • relation_end
    • federation_state
Source code in echo_ons/ons_opendata_sites.py
@validate_call
def get_related_spes(self, output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame") -> pd.DataFrame:
    """Gets all the related SPEs from ONS Open Data.

    The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

    Parameters
    ----------
    output_type : Literal["DataFrame", "pl.DataFrame"], optional
        Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".

    Returns
    -------
    pd.DataFrame
        DataFrame containing the related SPEs from ONS Open Data. The following columns are available:

        - site_key
        - site_name
        - spe_key (also know as CEG in ANEEL data)
        - spe_name
        - relation_start
        - relation_end
        - federation_state
    """
    df = self._get_ons_file()

    # removing unwanted columns
    df = df.drop(
        [
            "id_tipousina",
            "id_ons_usina",
            "id_subsistema",
            "nom_subsistema",
            "estad_id",
        ],
    )

    # renaming columns
    df = df.rename(
        {
            "id_ons_conjunto": "site_key",
            "nom_conjunto": "site_name",
            "ceg": "spe_key",
            "nom_usina": "spe_name",
            "dat_iniciorelacionamento": "relation_start",
            "dat_fimrelacionamento": "relation_end",
            "nom_estado": "federation_state",
        },
    )

    # getting correct names for the sites
    sites_df = df[["site_key", "site_name"]].unique()
    sites_df = sites_df.with_columns(
        (
            pl.col("site_name").cast(pl.Utf8)
            + pl.col("site_name").cum_count().over("site_name").map_elements(lambda x: "" if x == 1 else f"_{x}", return_dtype=pl.Utf8)
        ).alias("site_name"),
    )
    # remove "Conj. " from name
    sites_df = sites_df.with_columns(
        pl.col("site_name").str.replace("Conj. ", ""),
    )
    df = df.drop("site_name")
    df = df.join(sites_df, on="site_key", how="left")

    # removing last ".XX" from spe_key (its only a verifying digit and might cause problems when joining with ANEEL data)
    df = df.with_columns(
        pl.col("spe_key").str.replace(r"\.\d*$", ""),
    )

    df = df[["site_key", "site_name", "spe_key", "spe_name", "relation_start", "relation_end"]]

    if output_type == "DataFrame":
        return df.to_pandas(use_pyarrow_extension_array=True)
    return df

import_database(skip_existing=True)

Imports the sites from ONS Open Data to the database.

Parameters:

  • skip_existing

    (bool, default: True ) –

    If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True

Source code in echo_ons/ons_opendata_sites.py
@validate_call
def import_database(self, skip_existing: bool = True) -> None:
    """Imports the sites from ONS Open Data to the database.

    Parameters
    ----------
    skip_existing : bool, optional
        If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True
    """
    # getting the sites
    df: pl.DataFrame = self.get(output_type="pl.DataFrame")

    # getting all the existing sites
    existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001
    existing = pl.from_pandas(existing.reset_index(drop=False))
    # adding column ons_site_key if it does not exist
    if "ons_site_key" not in existing.columns:
        existing = existing.with_columns(pl.lit(None).cast(pl.String).alias("ons_site_key"))

    # getting only the non existing sites if skip_existing is True
    if skip_existing:
        df = df.filter(~pl.col("id_ons_conjunto").is_in(existing["ons_site_key"]))

    if df.is_empty():
        logger.info("No new sites to import.")
        return

    # renaming columns
    df = df.rename(
        {
            "id_ons_conjunto": "ons_site_key",
            "nom_conjunto": "name",
            "nom_subsistema": "ons_subsystem",
            "nom_estado": "federation_state",
        },
    )
    # remove "Conj. " from name
    df = df.with_columns(
        pl.col("name").str.replace("Conj. ", ""),
    )
    # adding "ONS-Site-" to name
    df = df.with_columns(
        ("ONS-Site-" + pl.col("name")).alias("name"),
    )

    # first lets add the objects that are not in the database
    new_objects = df.filter(~pl.col("ons_site_key").is_in(existing["ons_site_key"]))
    for row in new_objects[["name", "ons_site_key"]].iter_rows(named=True):
        self._ons._perfdb.objects.instances.insert(  # noqa: SLF001
            object_name=row["name"],
            object_model_name="ons_site_general",
            description=f"""Site "{row["name"].replace("ONS-Site-", "")}" from ONS Open Data (created automatically)""",
            on_conflict="ignore",
        )
        # now lets define exists_in_bazefield to False
        self._ons._perfdb.objects.instances.attributes.insert(  # noqa: SLF001
            object_name=row["name"],
            attribute_name="exists_in_bazefield",
            attribute_value=False,
            on_conflict="update",
        )
        # also defining the ons_site_key
        self._ons._perfdb.objects.instances.attributes.insert(  # noqa: SLF001
            object_name=row["name"],
            attribute_name="ons_site_key",
            attribute_value=row["ons_site_key"],
            on_conflict="update",
        )

    # getting all objects again to get the new ones
    existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001
    existing = pl.from_pandas(existing.reset_index(drop=False))
    # creating a mapping from ons_site_key to object_name
    ons_site_key_to_object_name = dict(zip(existing["ons_site_key"], existing["name"], strict=False))

    # * related SPEs

    related_spes = self.get_related_spes(output_type="pl.DataFrame")
    # getting all spes from the database to create the mapping
    spes = self._ons._perfdb.objects.instances.get(object_types=["ons_spe_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001
    spes = pl.from_pandas(spes.reset_index(drop=False))
    # merging the related_spes with the spes based on ons_spe_key to get the database name
    related_spes = related_spes.join(
        spes[["name", "ons_spe_key"]].rename({"name": "spe_db_name", "ons_spe_key": "spe_key"}),
        on="spe_key",
        how="left",
    )
    # merging the related_spes with the sites to get the object_name in the database
    related_spes = related_spes.join(
        existing[["name", "ons_site_key"]].rename({"name": "site_db_name", "ons_site_key": "site_key"}),
        on="site_key",
        how="left",
    )

    # dropping rows without spe_db_name or site_db_name
    related_spes = related_spes.filter(~pl.col("spe_db_name").is_null() & ~pl.col("site_db_name").is_null())

    # inserting on the database
    related_spes = related_spes[["site_db_name", "spe_db_name", "relation_start", "relation_end"]].rename(
        {"site_db_name": "ons_site_name", "spe_db_name": "ons_spe_name"},
    )
    self._ons._perfdb.ons.spes.sitemapping.insert(data=related_spes, on_conflict="update")  # noqa: SLF001

    # * coordinates and other SPE aggregate attributes

    related_spes = related_spes.join(
        spes[["name", "latitude", "longitude", "nominal_power", "connection_point", "ons_spe_type"]],
        left_on="ons_spe_name",
        right_on="name",
        how="left",
    )
    # grouping by ons_site_name
    site_spe_attrs_df = related_spes.group_by("ons_site_name").agg(
        pl.col("ons_spe_type").mode().first().alias("ons_site_type"),
        pl.col("latitude").mean().alias("latitude"),
        pl.col("longitude").mean().alias("longitude"),
        pl.col("nominal_power").sum().alias("nominal_power"),
        pl.col("connection_point").mode().first().alias("connection_point"),
    )
    existing = existing[["ons_site_key", "name", "federation_state", "ons_subsystem"]].join(
        site_spe_attrs_df.rename({"ons_site_name": "name"}),
        on="name",
        how="left",
    )

    # * data insertion

    # then lets update the attributes
    attrs = ["ons_subsystem", "federation_state", "latitude", "longitude", "nominal_power", "connection_point", "ons_site_type"]
    for row in existing[["ons_site_key", *attrs]].iter_rows(named=True):
        for attr in attrs:
            if row[attr] is None:
                logger.warning(f"Site {row['ons_site_key']} has attribute {attr} as None. Skipping.")
                continue
            self._ons._perfdb.objects.instances.attributes.insert(  # noqa: SLF001
                object_name=ons_site_key_to_object_name[row["ons_site_key"]],
                attribute_name=attr,
                attribute_value=row[attr],
                on_conflict="update",
            )

    logger.info(f"Inserted {len(new_objects)} new sites. Updated {len(df) - len(new_objects)} sites.")

OnsOpenDataSiteLimitations(ons)

Class used to get limitations variables in 30 min intervals from ONS Open Data. Can be accessed via ons.opendata.sites.limitations.

The data is available in the following links:

  • Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
  • Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    ons : Ons
        Top level object carrying all functionality and the connection handler.
    """
    self._ons: e_o.Ons = ons

get(period, output_type='DataFrame')

Gets all limitation variables within the given period from ONS Open Data.

The data is available in the following links:

  • Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
  • Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica

Parameters:

  • period

    (DateTimeRange) –

    The period for which the data will be retrieved.

  • output_type

    (Literal['DataFrame', 'pl.DataFrame'], default: "DataFrame" ) –

    The type of DataFrame to be returned. Can be either "DataFrame" for pandas DataFrame or "pl.DataFrame" for polars DataFrame.

Returns:

  • DataFrame | DataFrame

    DataFrame containing the sites from ONS Open Data. The following columns are available:

    • subsystem_key (str): the subsystem key
    • subsystem_name (str): the subsystem name
    • federation_state_key (str): the federation state key
    • federation_state_name (str): the federation state name
    • site_name (str): the site name
    • site_key (str): the site key
    • timestamp (datetime64[ns]): the timestamp of the data
    • ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
    • ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
    • ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
    • ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
    • ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
    • LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
    • LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
Source code in echo_ons/ons_opendata_site_limitations.py
@validate_call
def get(self, period: DateTimeRange, output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame") -> pd.DataFrame | pl.DataFrame:
    """Gets all limitation variables within the given period from ONS Open Data.

    The data is available in the following links:

    - Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
    - Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica

    Parameters
    ----------
    period : DateTimeRange
        The period for which the data will be retrieved.
    output_type : Literal["DataFrame", "pl.DataFrame"], default "DataFrame"
        The type of DataFrame to be returned. Can be either "DataFrame" for pandas DataFrame or "pl.DataFrame" for polars DataFrame.

    Returns
    -------
    pd.DataFrame | pl.DataFrame
        DataFrame containing the sites from ONS Open Data. The following columns are available:

        - subsystem_key (str): the subsystem key
        - subsystem_name (str): the subsystem name
        - federation_state_key (str): the federation state key
        - federation_state_name (str): the federation state name
        - site_name (str): the site name
        - site_key (str): the site key
        - timestamp (datetime64[ns]): the timestamp of the data
        - ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
        - ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
        - ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
        - ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
        - ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
        - LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
        - LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
    """
    # the files are available through AWS S3 in both CSV, xlsx and parquet formats (all with the same name)
    # each file contains data for one month
    # the files are named as:
    # - wind: RESTRICAO_COFF_EOLICA_YYYY_MM.xlsx (or .csv or .parquet)
    # - solar: RESTRIÇÃO_COFF_FOTOVOLTAICA_YYYY_MM.xlsx (or .csv or .parquet)
    base_urls = {
        "wind": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_eolica_tm/RESTRICAO_COFF_EOLICA",
        "solar": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_fotovoltaica_tm/RESTRICAO_COFF_FOTOVOLTAICA",
    }

    # defining the months that we will need to download
    get_period = period.copy()
    get_period.start = get_period.start.replace(day=1)
    get_periods = get_period.split_multiple(relativedelta(months=1))

    # iterating over the periods
    dfs = []
    for get_period in get_periods:
        # iterating over the site types
        for site_type, base_url in base_urls.items():
            # creating the url (first lets try to get the parquet file, which is the fastest but not always available)
            url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.parquet"
            try:
                t0 = perf_counter()
                logger.info(f"{site_type} - {get_period.start.strftime('%Y-%m')} - trying to get parquet file at '{url}'")
                df = pl.read_parquet(url)
                logger.info(f"Downloaded in {perf_counter() - t0:.2f}s")
            except Exception as e:
                url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.csv"
                if isinstance(e, HTTPError) and e.code == 404:
                    logger.warning(
                        f"{site_type} - {get_period.start.strftime('%Y-%m')} - parquet file not found, trying to get csv file at '{url}'",
                    )
                else:
                    logger.exception(
                        f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get parquet file, trying to get csv file at '{url}'",
                    )
                try:
                    t0 = perf_counter()
                    df = pl.read_csv(url, separator=";", infer_schema_length=None)
                    logger.info(f"Downloaded in {perf_counter() - t0:.2f}s")
                except Exception as e2:
                    df = None
                    if isinstance(e2, HTTPError) and e2.code == 404:
                        logger.error(f"{site_type} - {get_period.start.strftime('%Y-%m')} - csv file not found")
                    else:
                        logger.exception(
                            f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get both parquet and csv files",
                        )

            # if we could not get the file, we skip to the next iteration
            if df is None:
                continue

            # in case val columns are stored as string, convert it to numbers using , as decimal separator
            for col in df.select(pl.selectors.string()).columns:
                if col.startswith("val_"):
                    df = df.with_columns(
                        pl.col(col).cast(pl.Float64, strict=False),
                    )

            # checking if din_instante is a string or a datetime, if a string we convert it to datetime
            if "din_instante" in df.select(pl.selectors.string()).columns:
                df = df.with_columns(
                    pl.col("din_instante").str.strptime(pl.Datetime, fmt="%Y-%m-%d %H:%M:%S", strict=False),
                )

            # adding the DataFrame to the list
            dfs.append(df)

    # getting features from the database to rename
    features = self._ons._perfdb.features.definitions.get(  # noqa: SLF001
        object_models=["ons_site_general"],
        data_source_types=["ons_open_data"],
        output_type="DataFrame",
    )
    features.index = features.index.droplevel("object_model_name")
    feature_rename = features.reset_index(drop=False).set_index("name_in_data_source")["name"].to_dict()

    # concatenating the DataFrames
    df: pl.DataFrame = pl.concat(dfs, how="vertical")

    # filtering the DataFrame to the period
    df = df.filter(
        (df["din_instante"] >= pl.lit(period.start)) & (df["din_instante"] <= pl.lit(period.end)),
    )

    # dropping not used columns
    df = df.drop(["ceg"])

    # renaming columns for easier understanding
    df = df.rename(
        {
            "id_subsistema": "subsystem_key",
            "nom_subsistema": "subsystem_name",
            "id_estado": "federation_state_key",
            "nom_estado": "federation_state_name",
            "nom_usina": "site_name",
            "id_ons": "site_key",
            "din_instante": "timestamp",
        }
        | feature_rename,
    )

    if output_type == "DataFrame":
        df = df.to_pandas(use_pyarrow_extension_array=True)

    return df

import_database(period)

Imports the limitations data from ONS Open Data to the local database.

Parameters:

  • period

    (DateTimeRange) –

    The period for which the data will be retrieved.

Source code in echo_ons/ons_opendata_site_limitations.py
@validate_call
def import_database(self, period: DateTimeRange) -> None:
    """Imports the limitations data from ONS Open Data to the local database.

    Parameters
    ----------
    period : DateTimeRange
        The period for which the data will be retrieved.
    """
    # getting features from the database to rename
    features = self._ons._perfdb.features.definitions.get(  # noqa: SLF001
        object_models=["ons_site_general"],
        data_source_types=["ons_open_data"],
        output_type="DataFrame",
    )
    features.index = features.index.droplevel("object_model_name")
    features = pl.from_pandas(features.reset_index(drop=False))

    # getting the data
    df = self.get(period, output_type="pl.DataFrame")

    limitation_reason_feature = features.filter(pl.col("name_in_data_source") == "cod_razaorestricao").select("name").item()
    limitation_origin_feature = features.filter(pl.col("name_in_data_source") == "cod_origemrestricao").select("name").item()

    # replacing values in LimitationReason_30min.REP considering 1=REL, 2=CNF, 3=ENE
    df = df.with_columns(
        pl.col(limitation_reason_feature).replace(
            {"REL": 1, "CNF": 2, "ENE": 3, "": None},
            return_dtype=pl.Int64,
        ),
    )
    # replacing values in LimitationOrigin_30min.REP 1=SIS, 2=LOC
    df = df.with_columns(
        pl.col(limitation_origin_feature).replace(
            {"SIS": 1, "LOC": 2, "": None},
            return_dtype=pl.Int64,
        ),
    )

    # keeping only the columns needed for the database
    df = df[
        [
            "site_key",
            "timestamp",
            *features["name"].to_list(),
        ]
    ]

    # getting all the ONS Sites from the database to replace the site_key by their name in the database
    ons_sites = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001
    ons_sites = pl.from_pandas(ons_sites.reset_index(drop=False))

    # removing the sites that do not have ons_site_key attribute defined in the database
    ons_sites = ons_sites.filter(pl.col("ons_site_key").is_not_null())

    # making a dictionary to replace the site_key by the object_name
    rename = ons_sites.select(["ons_site_key", "name"]).to_pandas().set_index("ons_site_key")["name"].to_dict()

    # checking if all site_keys are in the dictionary
    missing_keys = set(df["site_key"].unique()) - set(rename.keys())
    if missing_keys:
        logger.warning(f"No ONS sites found for the following keys. They will not be imported. {missing_keys}")
        df = df.filter(~pl.col("site_key").is_in(list(missing_keys)))

    # replacing the site_key by the object_name
    df = df.with_columns(pl.col("site_key").replace(rename))

    # renaming the column
    df = df.rename({"site_key": "object_name"})

    # melting so all columns except object_name and timestamp become feature_name and value
    df = df.unpivot(index=["object_name", "timestamp"], variable_name="feature_name", value_name="value")

    # adding object_name as a prefix to feature_name
    df = df.with_columns(
        pl.concat_str([pl.col("object_name"), pl.lit("@"), pl.col("feature_name")]).alias("feature_name"),
    )

    # dropping object_name column
    df = df.drop("object_name")

    # pivoting the DataFrame to have timestamp as the index and the columns as feature_name
    df = df.pivot(values="value", index="timestamp", on="feature_name", aggregate_function="first")

    # inserting the data into the database
    self._ons._perfdb.features.values.series.insert(df=df, on_conflict="update", bazefield_upload=False)  # noqa: SLF001