Open Data - SPEs¶
OnsOpenDataSpes(ons)
¶
Class used to get the SPEs from ONS Open Data. Can be accessed via ons.opendata.spes
.
The data retrieved here is a mix of the following sources:
- Names/keys of the relevant SPEs: https://dados.ons.org.br/dataset/usina_conjunto
- Metadata about the SPEs: https://dadosabertos.aneel.gov.br/dataset/siga-sistema-de-informacoes-de-geracao-da-aneel
Parameters:
-
ons
¶Ons
) –Top level object carrying all functionality.
Source code in echo_ons/ons_opendata_spes.py
def __init__(self, ons: e_o.Ons) -> None:
"""Class used to get the sites from ONS Open Data. Can be accessed via `ons.opendata.spes`.
Parameters
----------
ons : Ons
Top level object carrying all functionality.
"""
super().__init__(ons)
# * subclasses
self.limitations = OnsOpenDataSpeLimitations(ons)
get()
¶
Gets all the spes from ONS Open Data.
The names/keys of the relevant SPEs are retrieved from following link: https://dados.ons.org.br/dataset/usina_conjunto
To get metadata about the SPEs we use the following link: https://dadosabertos.aneel.gov.br/dataset/siga-sistema-de-informacoes-de-geracao-da-aneel
Returns:
-
DataFrame
–DataFrame containing the spes from ONS Open Data. The following columns are available:
- spe_name
- spe_key (also know as CEG in ANEEL data)
- NomEmpreendimento
- IdeNucleoCEG
- SigUFPrincipal
- SigTipoGeracao
- DscFaseUsina
- DscOrigemCombustivel
- DscFonteCombustivel
- DscTipoOutorga
- NomFonteCombustivel
- DatEntradaOperacao
- MdaPotenciaOutorgadaKw
- MdaPotenciaFiscalizadaKw
- MdaGarantiaFisicaKw
- IdcGeracaoQualificada
- NumCoordNEmpreendimento
- NumCoordEEmpreendimento
- DatInicioVigencia
- DatFimVigencia
- DscPropriRegimePariticipacao
- DscSubBacia
- DscMuninicpios
- nom_pontoconexao
Source code in echo_ons/ons_opendata_spes.py
def get(self) -> pd.DataFrame:
"""Gets all the spes from ONS Open Data.
The names/keys of the relevant SPEs are retrieved from following link: https://dados.ons.org.br/dataset/usina_conjunto
To get metadata about the SPEs we use the following link: https://dadosabertos.aneel.gov.br/dataset/siga-sistema-de-informacoes-de-geracao-da-aneel
Returns
-------
pd.DataFrame
DataFrame containing the spes from ONS Open Data. The following columns are available:
- spe_name
- spe_key (also know as CEG in ANEEL data)
- NomEmpreendimento
- IdeNucleoCEG
- SigUFPrincipal
- SigTipoGeracao
- DscFaseUsina
- DscOrigemCombustivel
- DscFonteCombustivel
- DscTipoOutorga
- NomFonteCombustivel
- DatEntradaOperacao
- MdaPotenciaOutorgadaKw
- MdaPotenciaFiscalizadaKw
- MdaGarantiaFisicaKw
- IdcGeracaoQualificada
- NumCoordNEmpreendimento
- NumCoordEEmpreendimento
- DatInicioVigencia
- DatFimVigencia
- DscPropriRegimePariticipacao
- DscSubBacia
- DscMuninicpios
- nom_pontoconexao
"""
# getting parquet file from ONS S3 bucket
url = "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/usina_conjunto/RELACIONAMENTO_USINA_CONJUNTO.parquet"
# creating temporary folder
tmp_folder = Path("./tmp/")
tmp_folder.mkdir(exist_ok=True)
# downloading the file to temp folder
logger.info(f"Downloading file from {url}")
t0 = perf_counter()
urlretrieve(url, tmp_folder / "RELACIONAMENTO_USINA_CONJUNTO.parquet")
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s")
# reading the file
ons_df = pd.read_parquet(tmp_folder / "RELACIONAMENTO_USINA_CONJUNTO.parquet", dtype_backend="pyarrow", engine="pyarrow")
# getting only the names of the SPEs and their keys
ons_df = ons_df[["nom_usina", "ceg"]].rename(columns={"nom_usina": "spe_name", "ceg": "spe_key"})
ons_df = ons_df.drop_duplicates(subset=["spe_name", "spe_key"])
# removing last ".XX" from spe_key (its only a verifying digit and might cause problems when joining with ANEEL data)
ons_df["spe_key"] = ons_df["spe_key"].str.replace(r"\.\d*$", "", regex=True)
# getting metadata from ANEEL
url = "https://dadosabertos.aneel.gov.br/dataset/6d90b77c-c5f5-4d81-bdec-7bc619494bb9/resource/11ec447d-698d-4ab8-977f-b424d5deee6a/download/siga-empreendimentos-geracao.csv"
# downloading the file to temp folder
logger.info(f"Downloading file from {url}")
t0 = perf_counter()
urlretrieve(url, tmp_folder / "siga-empreendimentos-geracao.csv")
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s")
# reading the file
aneel_df = pd.read_csv(
tmp_folder / "siga-empreendimentos-geracao.csv",
sep=";",
encoding="cp1252",
engine="pyarrow",
dtype_backend="pyarrow",
)
# removing verifying digit from CodCEG
aneel_df["CodCEG"] = aneel_df["CodCEG"].str.replace(r"\.\d*$", "", regex=True)
# merging the DataFrames
ons_df = ons_df.merge(aneel_df, left_on="spe_key", right_on="CodCEG", how="left")
# dropping unnecessary columns
ons_df = ons_df.drop(columns=["CodCEG", "DatGeracaoConjuntoDados"])
# getting data from MODALIDADE USINA to get the connection point
url = "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/modalidade_usina/MODALIDADE_USINA.parquet"
# downloading the file to temp folder
logger.info(f"Downloading file from {url}")
t0 = perf_counter()
urlretrieve(url, tmp_folder / "MODALIDADE_USINA.parquet")
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s")
# reading the file
modalidade_df = pd.read_parquet(tmp_folder / "MODALIDADE_USINA.parquet", dtype_backend="pyarrow", engine="pyarrow")
# removing last ".XX" from spe_key (its only a verifying digit and might cause problems when joining with ANEEL data)
modalidade_df["ceg"] = modalidade_df["ceg"].str.replace(r"\.\d*$", "", regex=True)
# only getting relevant columns
modalidade_df = modalidade_df[["ceg", "nom_pontoconexao"]]
# forcing nom_pontoconexao to be a string
modalidade_df["nom_pontoconexao"] = modalidade_df["nom_pontoconexao"].astype("string[pyarrow]")
# merging the DataFrames
ons_df = ons_df.merge(modalidade_df, left_on="spe_key", right_on="ceg", how="left")
# dropping unnecessary columns
ons_df = ons_df.drop(columns=["ceg"])
# removing temporary folder
rmtree(tmp_folder, ignore_errors=True)
return ons_df
import_database(skip_existing=True)
¶
Imports the SPEs from ONS Open Data to the database.
Parameters:
-
skip_existing
¶bool
, default:True
) –If set to False will overwrite all attributes of the existing SPEs, if True will skip the SPEs that already exist in the database. By default True
Source code in echo_ons/ons_opendata_spes.py
def import_database(self, skip_existing: bool = True) -> None:
"""Imports the SPEs from ONS Open Data to the database.
Parameters
----------
skip_existing : bool, optional
If set to False will overwrite all attributes of the existing SPEs, if True will skip the SPEs that already exist in the database. By default True
"""
# checking arguments
if not isinstance(skip_existing, bool):
raise ValueError("skip_existing must be a boolean.")
# getting the SPEs
df = self.get()
# getting all the existing spes
existing = self._ons._perfdb.objects.instances.get(object_types=["ons_spe_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
# adding column ons_spe_key if it does not exist
if "ons_spe_key" not in existing.columns:
existing["ons_spe_key"] = pd.NA
existing["ons_spe_key"] = existing["ons_spe_key"].astype("string[pyarrow]")
# getting only the non existing spes if skip_existing is True
if skip_existing:
df = df[~df["spe_key"].isin(existing["ons_spe_key"])].copy()
if df.empty:
logger.info("No new SPEs to import.")
return
# renaming columns
df = df.rename(
columns={
"spe_key": "ons_spe_key",
"spe_name": "name",
"NomEmpreendimento": "long_name",
"SigTipoGeracao": "ons_spe_type",
"DatEntradaOperacao": "commercial_operation_date",
"MdaPotenciaFiscalizadaKw": "nominal_power",
"MdaGarantiaFisicaKw": "physical_guarantee",
"NumCoordNEmpreendimento": "latitude",
"NumCoordEEmpreendimento": "longitude",
"DscMuninicpios": "city",
"nom_pontoconexao": "connection_point",
},
)
# dropping where name or spe_key is NaN
df = df[~df["name"].isna() & ~df["ons_spe_key"].isna()].copy()
# dropping where ons_spe_type is None
df = df[~df["ons_spe_type"].isna()].copy()
# adding "ONS-SPE-" to name
df["name"] = "ONS-SPE-" + df["ons_spe_type"] + "-" + df["name"]
# converting dates from YYYY-MM-DD to datetime
df["commercial_operation_date"] = pd.to_datetime(df["commercial_operation_date"], errors="coerce")
# changing , to . and converting to float
for col in ["nominal_power", "physical_guarantee", "latitude", "longitude"]:
df[col] = df[col].astype("string").str.replace(",", ".", regex=False)
# converting types
df = df.astype(
{
"nominal_power": "double[pyarrow]",
"physical_guarantee": "double[pyarrow]",
"latitude": "double[pyarrow]",
"longitude": "double[pyarrow]",
},
)
# first lets add the objects that are not in the database
new_objects = df[~df["ons_spe_key"].isin(existing["ons_spe_key"])].copy()
for row in new_objects[["name", "ons_spe_key", "long_name"]].to_dict(orient="records"):
try:
self._ons._perfdb.objects.instances.insert( # noqa: SLF001
object_name=row["name"],
object_model_name="ons_spe_general",
description=f"""SPE "{row['long_name']}" from ONS Open Data (created automatically)""",
)
except Exception:
logger.exception(f"Error inserting SPE {row['name']} into the database. It might already exist.")
# now lets define exists_in_bazefield to False
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=row["name"],
attribute_name="exists_in_bazefield",
attribute_value=False,
on_conflict="update",
)
# also defining the ons_spe_key
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=row["name"],
attribute_name="ons_spe_key",
attribute_value=row["ons_spe_key"],
on_conflict="update",
)
# getting all objects again to get the new ones
existing = self._ons._perfdb.objects.instances.get(object_types=["ons_spe_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
# creating a mapping from ons_spe_key to object_name
ons_spe_key_to_object_name = existing[["ons_spe_key"]].reset_index(drop=False).set_index("ons_spe_key")["name"].to_dict()
# now lets update the attributes
attrs = [
"ons_spe_type",
"commercial_operation_date",
"nominal_power",
"physical_guarantee",
"latitude",
"longitude",
"city",
"connection_point",
]
for row in df[["ons_spe_key", *attrs]].to_dict(orient="records"):
object_name = ons_spe_key_to_object_name[row["ons_spe_key"]]
for attr in attrs:
if pd.isna(row[attr]):
logger.warning(f"Attribute {attr} for SPE {object_name} is NaN. Skipping.")
continue
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=object_name,
attribute_name=attr,
attribute_value=row[attr],
on_conflict="update",
)
logger.info(f"Inserted {len(new_objects)} new SPEs. Updated {len(df) - len(new_objects)} SPEs.")
OnsOpenDataSpeLimitations(ons)
¶
Class used to get limitations variables for SPEs in 30 min intervals from ONS Open Data. Can be accessed via ons.opendata.spes.limitations
.
The data is available in the following links:
- Wind spes: https://dados.ons.org.br/dataset/restricao_coff_eolica_detail
- Solar spes: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica_detail
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
ons : Ons
Top level object carrying all functionality and the connection handler.
"""
# check inputs
if not isinstance(ons, e_o.Ons):
raise ValueError(f"ons must be of type Ons, not {type(ons)}")
self._ons: e_o.Ons = ons
get(period)
¶
Gets all limitation variables within the given period from ONS Open Data.
The data is available in the following links:
- Wind spes: https://dados.ons.org.br/dataset/restricao_coff_eolica_detail
- Solar spes: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica_detail
Parameters:
-
period
¶DateTimeRange
) –The period for which the data will be retrieved.
Returns:
-
DataFrame
–DataFrame containing the spes from ONS Open Data. The following columns are available:
- subsystem_key (str): the subsystem key
- federation_state_key (str): the federation state key
- site_name (str): the site name
- spe_name (str): the SPE name
- spe_ons_key (str): the SPE ONS key
- spe_key (str): the SPE key (CEG ANEEL)
- timestamp (datetime64[ns]): the timestamp of the data
- WindSpeed_30min.AVG (float): the wind speed in 30 min intervals (only applicable for wind spes)
- WindSpeedValid_30min.REP (float): the wind speed valid in 30 min intervals (only applicable for wind spes)
- Irradiance_30min.AVG (float): the irradiance in 30 min intervals (only applicable for solar spes)
- IrradianceValid_30min.REP (float): the irradiance valid in 30 min intervals (only applicable for solar spes)
- ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
- ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
Source code in echo_ons/ons_opendata_spe_limitations.py
def get(self, period: DateTimeRange) -> pd.DataFrame:
"""Gets all limitation variables within the given period from ONS Open Data.
The data is available in the following links:
- Wind spes: https://dados.ons.org.br/dataset/restricao_coff_eolica_detail
- Solar spes: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica_detail
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
Returns
-------
pd.DataFrame
DataFrame containing the spes from ONS Open Data. The following columns are available:
- subsystem_key (str): the subsystem key
- federation_state_key (str): the federation state key
- site_name (str): the site name
- spe_name (str): the SPE name
- spe_ons_key (str): the SPE ONS key
- spe_key (str): the SPE key (CEG ANEEL)
- timestamp (datetime64[ns]): the timestamp of the data
- WindSpeed_30min.AVG (float): the wind speed in 30 min intervals (only applicable for wind spes)
- WindSpeedValid_30min.REP (float): the wind speed valid in 30 min intervals (only applicable for wind spes)
- Irradiance_30min.AVG (float): the irradiance in 30 min intervals (only applicable for solar spes)
- IrradianceValid_30min.REP (float): the irradiance valid in 30 min intervals (only applicable for solar spes)
- ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
- ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
"""
# the files are available through AWS S3 in both CSV, xlsx and parquet formats (all with the same name)
# each file contains data for one month
# the files are named as:
# - wind: Restricoes_coff_Eolicas_Detalhamento-YYYY-MM.xlsx (or .csv or .parquet)
# - solar: RESTRICAO_COFF_FOTOVOLTAICA_DETAIL_YYYY_MM.xlsx (or .csv or .parquet)
base_urls = {
"wind": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_eolica_detail_tm/RESTRICAO_COFF_EOLICA_DETAIL",
"solar": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_fotovoltaica_detail_tm/RESTRICAO_COFF_FOTOVOLTAICA_DETAIL",
}
# defining the months that we will need to download
get_period = period.copy()
get_period.start = get_period.start.replace(day=1)
get_periods = get_period.split_multiple(relativedelta(months=1))
# creating temporary folder
tmp_folder = Path("./tmp/")
tmp_folder.mkdir(exist_ok=True)
# iterating over the periods
dfs = []
for get_period in get_periods:
# iterating over the spe types
for spe_type, base_url in base_urls.items():
# creating the url (first lets try to get the parquet file, which is the fastest but not always available)
url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.parquet"
try:
t0 = perf_counter()
logger.info(f"{spe_type} - {get_period.start.strftime('%Y-%m')} - trying to get parquet file at '{url}'")
file_name = tmp_folder / f"{spe_type}_{get_period.start.strftime('%Y_%m')}.parquet"
urlretrieve(url, file_name)
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s to '{file_name}'")
except Exception as e:
url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.csv"
if isinstance(e, HTTPError) and e.code == 404:
logger.warning(
f"{spe_type} - {get_period.start.strftime('%Y-%m')} - parquet file not found, trying to get csv file at '{url}'",
)
else:
logger.exception(
f"{spe_type} - {get_period.start.strftime('%Y-%m')} - could not get parquet file, trying to get csv file at '{url}'",
)
try:
t0 = perf_counter()
file_name = tmp_folder / f"{spe_type}_{get_period.start.strftime('%Y_%m')}.csv"
urlretrieve(url, file_name)
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s to '{file_name}'")
except Exception as e2:
file_name = None
if isinstance(e2, HTTPError) and e2.code == 404:
logger.error(f"{spe_type} - {get_period.start.strftime('%Y-%m')} - csv file not found")
else:
logger.exception(
f"{spe_type} - {get_period.start.strftime('%Y-%m')} - could not get both parquet and csv files",
)
# if we could not get the file, we skip to the next iteration
if file_name is None:
continue
# reading the file
if file_name.suffix == ".parquet":
df = pd.read_parquet(file_name, engine="pyarrow", dtype_backend="pyarrow")
elif file_name.suffix == ".csv":
df = pd.read_csv(file_name, sep=";", decimal=".", engine="pyarrow", dtype_backend="pyarrow")
# forcing type casting to avoid errors
df = df.astype(
{
"id_subsistema": "string[pyarrow]",
"id_estado": "string[pyarrow]",
"nom_conjuntousina": "string[pyarrow]",
"nom_usina": "string[pyarrow]",
"id_ons": "string[pyarrow]",
"ceg": "string[pyarrow]",
"val_geracaoestimada": "double[pyarrow]",
"val_geracaoverificada": "double[pyarrow]",
},
)
if "val_irradianciaverificado" in df.columns:
df = df.astype({"val_irradianciaverificado": "double[pyarrow]", "flg_dadoirradianciainvalido": "double[pyarrow]"})
if "val_ventoverificado" in df.columns:
df = df.astype({"val_ventoverificado": "double[pyarrow]", "flg_dadoventoinvalido": "double[pyarrow]"})
# checking if din_instante is a string or a datetime, if a string we convert it to datetime
if df["din_instante"].dtype == "string":
df["din_instante"] = pd.to_datetime(df["din_instante"], format="%Y-%m-%d %H:%M:%S")
df["din_instante"] = df["din_instante"].astype("datetime64[s]")
# adding the DataFrame to the list
dfs.append(df)
# getting features from the database to rename
features = self._ons._perfdb.features.definitions.get( # noqa: SLF001
object_models=["ons_spe_general"],
data_source_types=["ons_open_data"],
output_type="DataFrame",
)
features.index = features.index.droplevel("object_model_name")
feature_rename = features.reset_index(drop=False).set_index("name_in_data_source")["name"].to_dict()
# concatenating the DataFrames
df = pd.concat(dfs, ignore_index=True)
# filtering the DataFrame to the period
df = df[(df["din_instante"] >= period.start) & (df["din_instante"] <= period.end)]
# dropping not used columns
df = df.drop(columns=["nom_modalidadeoperacao"])
# removing verifying digit from ceg
df["ceg"] = df["ceg"].str.replace(r"\.\d*$", "", regex=True)
# renaming columns for easier understanding
df = df.rename(
columns={
"id_subsistema": "subsystem_key",
"id_estado": "federation_state_key",
"nom_conjuntousina": "site_name",
"nom_usina": "spe_name",
"id_ons": "spe_ons_key",
"ceg": "spe_key",
"din_instante": "timestamp",
}
| feature_rename,
)
# removing temporary folder
rmtree(tmp_folder, ignore_errors=True)
return df
import_database(period)
¶
Imports the limitations data from ONS Open Data to the local database.
Parameters:
-
period
¶DateTimeRange
) –The period for which the data will be retrieved.
Source code in echo_ons/ons_opendata_spe_limitations.py
def import_database(self, period: DateTimeRange) -> None:
"""Imports the limitations data from ONS Open Data to the local database.
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
"""
# checking inputs
if not isinstance(period, DateTimeRange):
raise TypeError(f"period must be a DateTimeRange object. Got {type(period)}.")
# getting features from the database to rename
features = self._ons._perfdb.features.definitions.get( # noqa: SLF001
object_models=["ons_spe_general"],
data_source_types=["ons_open_data"],
output_type="DataFrame",
)
features.index = features.index.droplevel("object_model_name")
# getting the data
df = self.get(period)
# dropping the features that are not present in df (case when we are only getting wind or solar data)
features = features[features.index.isin(df.columns)].copy()
# keeping only the columns needed for the database
df = df[
[
"spe_key",
"timestamp",
*features.index.to_list(),
]
].copy()
# getting all the ONS Spes from the database to replace the spe_key by their name in the database
ons_spes = self._ons._perfdb.objects.instances.get(object_types=["ons_spe_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
# removing the spes that do not have ons_spe_key attribute defined in the database
ons_spes = ons_spes[ons_spes["ons_spe_key"].notna()].copy()
# making a dictionary to replace the spe_key by the object_name
rename = ons_spes.reset_index(drop=False).set_index("ons_spe_key")["name"].to_dict()
# checking if all spe_keys are in the dictionary
missing_keys = set(df["spe_key"].unique()) - set(rename.keys())
if missing_keys:
logger.warning(f"No ONS spes found for the following keys. They will not be imported. {missing_keys}")
df = df[~df["spe_key"].isin(missing_keys)].copy()
# replacing the spe_key by the object_name
df["spe_key"] = df["spe_key"].map(rename)
# renaming the column
df = df.rename(columns={"spe_key": "object_name"})
# melting the DataFrame to have the feature_name as a column
df = df.melt(id_vars=["object_name", "timestamp"], var_name="feature_name", value_name="value")
# pivoting the DataFrame to have timestamp as the index and the columns as a MultiIndex with the object_name and feature_name as its levels
df = df.pivot_table(values="value", index="timestamp", columns=["object_name", "feature_name"])
# inserting the data into the database
self._ons._perfdb.features.values.series.insert(df=df, on_conflict="update", bazefield_upload=False) # noqa: SLF001