Skip to content

Open Data - Sites

OnsOpenDataSites(ons)

Class used to get the sites from ONS Open Data. Can be accessed via ons.opendata.sites.

The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

Parameters:

  • ons

    (Ons) –

    Top level object carrying all functionality.

Source code in echo_ons/ons_opendata_sites.py
def __init__(self, ons: e_o.Ons) -> None:
    """Class used to get the sites from ONS Open Data. Can be accessed via `ons.opendata.sites`.

    Parameters
    ----------
    ons : Ons
        Top level object carrying all functionality.
    """
    super().__init__(ons)

    # * subclasses

    self.limitations = OnsOpenDataSiteLimitations(ons)

get()

Gets all the sites from ONS Open Data.

The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

Returns:

  • DataFrame

    DataFrame containing the sites from ONS Open Data. The following columns are available:

    • id_subsistema
    • nom_subsistema
    • estad_id
    • nom_estado
    • id_ons_conjunto
    • nom_conjunto
Source code in echo_ons/ons_opendata_sites.py
def get(self) -> pd.DataFrame:
    """Gets all the sites from ONS Open Data.

    The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

    Returns
    -------
    pd.DataFrame
        DataFrame containing the sites from ONS Open Data. The following columns are available:

        - id_subsistema
        - nom_subsistema
        - estad_id
        - nom_estado
        - id_ons_conjunto
        - nom_conjunto

    """
    df = self._get_ons_file()

    # first lets drop duplicates only based on id_ons_conjunto and nom_conjunto
    df = df.drop_duplicates(subset=["id_ons_conjunto", "nom_conjunto"])

    # now in case there are duplicates lets add 2, 3, ... to the "nom_conjunto" column
    df["nom_conjunto"] = df["nom_conjunto"].astype("str") + df.groupby("nom_conjunto").cumcount().apply(
        lambda x: "" if x == 0 else f"_{x + 1}",
    )

    # removing unwanted columns
    df = df.drop(columns=["id_tipousina", "id_ons_usina", "nom_usina", "ceg", "dat_iniciorelacionamento", "dat_fimrelacionamento"])

    # dropping duplicates
    df = df.drop_duplicates()

    # resetting index
    df = df.reset_index(drop=True)

    return df

Gets all the related SPEs from ONS Open Data.

The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

Returns:

  • DataFrame

    DataFrame containing the related SPEs from ONS Open Data. The following columns are available:

    • site_key
    • site_name
    • spe_key (also know as CEG in ANEEL data)
    • spe_name
Source code in echo_ons/ons_opendata_sites.py
def get_related_spes(self) -> pd.DataFrame:
    """Gets all the related SPEs from ONS Open Data.

    The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto

    Returns
    -------
    pd.DataFrame
        DataFrame containing the related SPEs from ONS Open Data. The following columns are available:

        - site_key
        - site_name
        - spe_key (also know as CEG in ANEEL data)
        - spe_name
    """
    df = self._get_ons_file()

    # removing unwanted columns
    df = df.drop(
        columns=[
            "id_tipousina",
            "id_ons_usina",
            "dat_iniciorelacionamento",
            "dat_fimrelacionamento",
            "id_subsistema",
            "nom_subsistema",
            "estad_id",
            "nom_estado",
        ],
    )

    # renaming columns
    df = df.rename(columns={"id_ons_conjunto": "site_key", "nom_conjunto": "site_name", "ceg": "spe_key", "nom_usina": "spe_name"})

    # getting correct names for the sites
    sites_df = df[["site_key", "site_name"]].drop_duplicates()
    sites_df["site_name"] = sites_df["site_name"].astype("str") + sites_df.groupby("site_name").cumcount().apply(
        lambda x: "" if x == 0 else f"_{x + 1}",
    )
    # remove "Conj. " from name
    sites_df["site_name"] = sites_df["site_name"].str.replace("Conj. ", "")
    df = df.drop(columns=["site_name"])
    df = df.merge(sites_df, on="site_key", how="left")

    # removing last ".XX" from spe_key (its only a verifying digit and might cause problems when joining with ANEEL data)
    df["spe_key"] = df["spe_key"].str.replace(r"\.\d*$", "", regex=True)

    return df[["site_key", "site_name", "spe_key", "spe_name"]]

import_database(skip_existing=True)

Imports the sites from ONS Open Data to the database.

Parameters:

  • skip_existing

    (bool, default: True ) –

    If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True

Source code in echo_ons/ons_opendata_sites.py
def import_database(self, skip_existing: bool = True) -> None:
    """Imports the sites from ONS Open Data to the database.

    Parameters
    ----------
    skip_existing : bool, optional
        If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True
    """
    # checking arguments
    if not isinstance(skip_existing, bool):
        raise ValueError("skip_existing must be a boolean.")

    # getting the sites
    df = self.get()

    # getting all the existing sites
    existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001
    # adding column ons_site_key if it does not exist
    if "ons_site_key" not in existing.columns:
        existing["ons_site_key"] = pd.NA
        existing["ons_site_key"] = existing["ons_site_key"].astype("string[pyarrow]")

    # getting only the non existing sites if skip_existing is True
    if skip_existing:
        df = df[~df["id_ons_conjunto"].isin(existing["ons_site_key"])].copy()

    if df.empty:
        logger.info("No new sites to import.")
        return

    # renaming columns
    df = df.rename(
        columns={
            "id_ons_conjunto": "ons_site_key",
            "nom_conjunto": "name",
            "nom_subsistema": "ons_subsystem",
            "nom_estado": "federation_state",
        },
    )
    # remove "Conj. " from name
    df["name"] = df["name"].str.replace("Conj. ", "")
    # adding "ONS-Site-" to name
    df["name"] = "ONS-Site-" + df["name"]

    # first lets add the objects that are not in the database
    new_objects = df[~df["ons_site_key"].isin(existing["ons_site_key"])].copy()
    for row in new_objects[["name", "ons_site_key"]].to_dict(orient="records"):
        self._ons._perfdb.objects.instances.insert(  # noqa: SLF001
            object_name=row["name"],
            object_model_name="ons_site_general",
            description=f"""Site "{row["name"].replace("ONS-Site-", "")}" from ONS Open Data (created automatically)""",
            on_conflict="ignore",
        )
        # now lets define exists_in_bazefield to False
        self._ons._perfdb.objects.instances.attributes.insert(  # noqa: SLF001
            object_name=row["name"],
            attribute_name="exists_in_bazefield",
            attribute_value=False,
            on_conflict="update",
        )
        # also defining the ons_site_key
        self._ons._perfdb.objects.instances.attributes.insert(  # noqa: SLF001
            object_name=row["name"],
            attribute_name="ons_site_key",
            attribute_value=row["ons_site_key"],
            on_conflict="update",
        )

    # getting all objects again to get the new ones
    existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001
    # creating a mapping from ons_site_key to object_name
    ons_site_key_to_object_name = existing[["ons_site_key"]].reset_index(drop=False).set_index("ons_site_key")["name"].to_dict()

    # * related SPEs

    # finding the related SPEs
    related_spes = self.get_related_spes()
    # getting all spes from the database to create the mapping
    spes = self._ons._perfdb.objects.instances.get(object_types=["ons_spe_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001
    spes = (
        spes.reset_index(drop=False)
        .rename(columns={"name": "spe_name_db", "ons_spe_key": "spe_key"})[
            ["spe_name_db", "spe_key", "latitude", "longitude", "nominal_power", "connection_point"]
        ]
        .copy()
    )
    # merging the related_spes with the spes to get the ons_spe_key
    related_spes = related_spes.merge(spes, left_on="spe_key", right_on="spe_key", how="left")
    # merging the related_spes with the sites to get the object_name in the database
    sites_db = (
        existing.reset_index(drop=False)
        .rename(columns={"name": "site_name_db", "ons_site_key": "site_key"})[["site_name_db", "site_key"]]
        .copy()
    )
    related_spes = related_spes.merge(sites_db, left_on="site_key", right_on="site_key", how="left")
    # dropping rows where spe_name_db is NaN
    related_spes = related_spes[~related_spes["spe_name_db"].isna()].copy()

    # now lets group by site_name_db and spe_name_db to create a list of spe_name_db for each site_name_db
    related_spes = (
        related_spes[["site_name_db", "spe_name_db", "latitude", "longitude", "nominal_power", "connection_point"]]
        .astype({"spe_name_db": "string"})
        .groupby("site_name_db")
        .agg(list)
    )

    # now lets merge the related_spes with df to use it as the ons_spes attribute
    df = df.merge(related_spes.rename(columns={"spe_name_db": "ons_spes"}), left_on="name", right_index=True, how="left")

    # * coordinates and other SPE aggregate attributes

    # getting latitude and longitude as the average of these attributes from the related_spes
    df["latitude"] = df["latitude"].apply(lambda x: mean(x) if isinstance(x, list) else pd.NA)
    df["longitude"] = df["longitude"].apply(lambda x: mean(x) if isinstance(x, list) else pd.NA)
    # nominal power
    df["nominal_power"] = df["nominal_power"].apply(lambda x: sum(x) if isinstance(x, list) else pd.NA)
    # connection point (first non NaN value in list
    df["connection_point"] = df["connection_point"].apply(
        lambda x: next((i for i in x if not pd.isna(i)), pd.NA) if isinstance(x, list) else pd.NA,
    )

    df = df.astype({"latitude": "float64", "longitude": "float64", "nominal_power": "float64", "connection_point": "string[pyarrow]"})
    # * data insertion

    # then lets update the attributes
    attrs = ["ons_subsystem", "federation_state", "ons_spes", "latitude", "longitude", "nominal_power", "connection_point"]
    for row in df[["ons_site_key", *attrs]].to_dict(orient="records"):
        for attr in attrs:
            if not isinstance(row[attr], list) and pd.isna(row[attr]):
                logger.warning(f"Site {row['ons_site_key']} has attribute {attr} as NaN. Skipping.")
                continue
            self._ons._perfdb.objects.instances.attributes.insert(  # noqa: SLF001
                object_name=ons_site_key_to_object_name[row["ons_site_key"]],
                attribute_name=attr,
                attribute_value=row[attr],
                on_conflict="update",
            )

    logger.info(f"Inserted {len(new_objects)} new sites. Updated {len(df) - len(new_objects)} sites.")

OnsOpenDataSiteLimitations(ons)

Class used to get limitations variables in 30 min intervals from ONS Open Data. Can be accessed via ons.opendata.sites.limitations.

The data is available in the following links:

  • Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
  • Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    ons : Ons
        Top level object carrying all functionality and the connection handler.
    """
    # check inputs
    if not isinstance(ons, e_o.Ons):
        raise ValueError(f"ons must be of type Ons, not {type(ons)}")

    self._ons: e_o.Ons = ons

get(period)

Gets all limitation variables within the given period from ONS Open Data.

The data is available in the following links:

  • Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
  • Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica

Parameters:

  • period

    (DateTimeRange) –

    The period for which the data will be retrieved.

Returns:

  • DataFrame

    DataFrame containing the sites from ONS Open Data. The following columns are available:

    • subsystem_key (str): the subsystem key
    • subsystem_name (str): the subsystem name
    • federation_state_key (str): the federation state key
    • federation_state_name (str): the federation state name
    • site_name (str): the site name
    • site_key (str): the site key
    • timestamp (datetime64[ns]): the timestamp of the data
    • ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
    • ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
    • ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
    • ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
    • ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
    • LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
    • LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
Source code in echo_ons/ons_opendata_site_limitations.py
def get(self, period: DateTimeRange) -> pd.DataFrame:
    """Gets all limitation variables within the given period from ONS Open Data.

    The data is available in the following links:

    - Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
    - Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica

    Parameters
    ----------
    period : DateTimeRange
        The period for which the data will be retrieved.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the sites from ONS Open Data. The following columns are available:

        - subsystem_key (str): the subsystem key
        - subsystem_name (str): the subsystem name
        - federation_state_key (str): the federation state key
        - federation_state_name (str): the federation state name
        - site_name (str): the site name
        - site_key (str): the site key
        - timestamp (datetime64[ns]): the timestamp of the data
        - ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
        - ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
        - ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
        - ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
        - ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
        - LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
        - LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
    """
    # the files are available through AWS S3 in both CSV, xlsx and parquet formats (all with the same name)
    # each file contains data for one month
    # the files are named as:
    # - wind: RESTRICAO_COFF_EOLICA_YYYY_MM.xlsx (or .csv or .parquet)
    # - solar: RESTRIÇÃO_COFF_FOTOVOLTAICA_YYYY_MM.xlsx (or .csv or .parquet)
    base_urls = {
        "wind": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_eolica_tm/RESTRICAO_COFF_EOLICA",
        "solar": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_fotovoltaica_tm/RESTRICAO_COFF_FOTOVOLTAICA",
    }

    # defining the months that we will need to download
    get_period = period.copy()
    get_period.start = get_period.start.replace(day=1)
    get_periods = get_period.split_multiple(relativedelta(months=1))

    # creating temporary folder
    tmp_folder = Path("./tmp/")
    tmp_folder.mkdir(exist_ok=True)

    # iterating over the periods
    dfs = []
    for get_period in get_periods:
        # iterating over the site types
        for site_type, base_url in base_urls.items():
            # creating the url (first lets try to get the parquet file, which is the fastest but not always available)
            url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.parquet"
            try:
                t0 = perf_counter()
                logger.info(f"{site_type} - {get_period.start.strftime('%Y-%m')} - trying to get parquet file at '{url}'")
                file_name = tmp_folder / f"{site_type}_{get_period.start.strftime('%Y_%m')}.parquet"
                urlretrieve(url, file_name)
                logger.info(f"Downloaded in {perf_counter() - t0:.2f}s to '{file_name}'")
            except Exception as e:
                url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.csv"
                if isinstance(e, HTTPError) and e.code == 404:
                    logger.warning(
                        f"{site_type} - {get_period.start.strftime('%Y-%m')} - parquet file not found, trying to get csv file at '{url}'",
                    )
                else:
                    logger.exception(
                        f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get parquet file, trying to get csv file at '{url}'",
                    )
                try:
                    t0 = perf_counter()
                    file_name = tmp_folder / f"{site_type}_{get_period.start.strftime('%Y_%m')}.csv"
                    urlretrieve(url, file_name)
                    logger.info(f"Downloaded in {perf_counter() - t0:.2f}s to '{file_name}'")
                except Exception as e2:
                    file_name = None
                    if isinstance(e2, HTTPError) and e2.code == 404:
                        logger.error(f"{site_type} - {get_period.start.strftime('%Y-%m')} - csv file not found")
                    else:
                        logger.exception(
                            f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get both parquet and csv files",
                        )

            # if we could not get the file, we skip to the next iteration
            if file_name is None:
                continue

            # reading the file
            if file_name.suffix == ".parquet":
                df = pd.read_parquet(file_name, engine="pyarrow", dtype_backend="pyarrow")
            elif file_name.suffix == ".csv":
                df = pd.read_csv(file_name, sep=";", decimal=".", engine="pyarrow", dtype_backend="pyarrow")

            # forcing type casting to avoid errors
            df = df.astype(
                {
                    "id_subsistema": "string[pyarrow]",
                    "nom_subsistema": "string[pyarrow]",
                    "id_estado": "string[pyarrow]",
                    "nom_estado": "string[pyarrow]",
                    "nom_usina": "string[pyarrow]",
                    "id_ons": "string[pyarrow]",
                    "ceg": "string[pyarrow]",
                    "val_geracao": "double[pyarrow]",
                    "val_geracaolimitada": "double[pyarrow]",
                    "val_disponibilidade": "double[pyarrow]",
                    "val_geracaoreferencia": "double[pyarrow]",
                    "val_geracaoreferenciafinal": "double[pyarrow]",
                    "cod_razaorestricao": "string[pyarrow]",
                    "cod_origemrestricao": "string[pyarrow]",
                },
            )
            # checking if din_instante is a string or a datetime, if a string we convert it to datetime
            if df["din_instante"].dtype == "string":
                df["din_instante"] = pd.to_datetime(df["din_instante"], format="%Y-%m-%d %H:%M:%S")
            df["din_instante"] = df["din_instante"].astype("datetime64[s]")

            # adding the DataFrame to the list
            dfs.append(df)

    # getting features from the database to rename
    features = self._ons._perfdb.features.definitions.get(  # noqa: SLF001
        object_models=["ons_site_general"],
        data_source_types=["ons_open_data"],
        output_type="DataFrame",
    )
    features.index = features.index.droplevel("object_model_name")
    feature_rename = features.reset_index(drop=False).set_index("name_in_data_source")["name"].to_dict()

    # concatenating the DataFrames
    df = pd.concat(dfs, ignore_index=True)

    # filtering the DataFrame to the period
    df = df[(df["din_instante"] >= period.start) & (df["din_instante"] <= period.end)]

    # dropping not used columns
    df = df.drop(columns=["ceg"])

    # renaming columns for easier understanding
    df = df.rename(
        columns={
            "id_subsistema": "subsystem_key",
            "nom_subsistema": "subsystem_name",
            "id_estado": "federation_state_key",
            "nom_estado": "federation_state_name",
            "nom_usina": "site_name",
            "id_ons": "site_key",
            "din_instante": "timestamp",
        }
        | feature_rename,
    )

    # removing temporary folder
    rmtree(tmp_folder, ignore_errors=True)

    return df

import_database(period)

Imports the limitations data from ONS Open Data to the local database.

Parameters:

  • period

    (DateTimeRange) –

    The period for which the data will be retrieved.

Source code in echo_ons/ons_opendata_site_limitations.py
def import_database(self, period: DateTimeRange) -> None:
    """Imports the limitations data from ONS Open Data to the local database.

    Parameters
    ----------
    period : DateTimeRange
        The period for which the data will be retrieved.
    """
    # checking inputs
    if not isinstance(period, DateTimeRange):
        raise TypeError(f"period must be a DateTimeRange object. Got {type(period)}.")

    # getting features from the database to rename
    features = self._ons._perfdb.features.definitions.get(  # noqa: SLF001
        object_models=["ons_site_general"],
        data_source_types=["ons_open_data"],
        output_type="DataFrame",
    )
    features.index = features.index.droplevel("object_model_name")

    # getting the data
    df = self.get(period)

    limitation_reason_feature = features[features["name_in_data_source"] == "cod_razaorestricao"].index[0]
    limitation_origin_feature = features[features["name_in_data_source"] == "cod_origemrestricao"].index[0]

    # replacing values in LimitationReason_30min.REP considering 1=REL, 2=CNF, 3=ENE
    df[limitation_reason_feature] = df[limitation_reason_feature].map({"REL": 1, "CNF": 2, "ENE": 3})
    # replacing values in LimitationOrigin_30min.REP 1=SIS, 2=LOC
    df[limitation_origin_feature] = df[limitation_origin_feature].map({"SIS": 1, "LOC": 2})

    df = df.astype({limitation_reason_feature: "double[pyarrow]", limitation_origin_feature: "double[pyarrow]"})

    # keeping only the columns needed for the database
    df = df[
        [
            "site_key",
            "timestamp",
            *features.index.to_list(),
        ]
    ].copy()

    # getting all the ONS Sites from the database to replace the site_key by their name in the database
    ons_sites = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame")  # noqa: SLF001

    # removing the sites that do not have ons_site_key attribute defined in the database
    ons_sites = ons_sites[ons_sites["ons_site_key"].notna()].copy()

    # making a dictionary to replace the site_key by the object_name
    rename = ons_sites.reset_index(drop=False).set_index("ons_site_key")["name"].to_dict()

    # checking if all site_keys are in the dictionary
    missing_keys = set(df["site_key"].unique()) - set(rename.keys())
    if missing_keys:
        logger.warning(f"No ONS sites found for the following keys. They will not be imported. {missing_keys}")
        df = df[~df["site_key"].isin(missing_keys)].copy()

    # replacing the site_key by the object_name
    df["site_key"] = df["site_key"].map(rename)

    # renaming the column
    df = df.rename(columns={"site_key": "object_name"})

    # melting the DataFrame to have the feature_name as a column
    df = df.melt(id_vars=["object_name", "timestamp"], var_name="feature_name", value_name="value")

    # pivoting the DataFrame to have timestamp as the index and the columns as a MultiIndex with the object_name and feature_name as its levels
    df = df.pivot_table(values="value", index="timestamp", columns=["object_name", "feature_name"])

    # inserting the data into the database
    self._ons._perfdb.features.values.series.insert(df=df, on_conflict="update", bazefield_upload=False)  # noqa: SLF001