Open Data - Sites¶
OnsOpenDataSites(ons)
¶
Class used to get the sites from ONS Open Data. Can be accessed via ons.opendata.sites
.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Parameters:
-
ons
¶Ons
) –Top level object carrying all functionality.
Source code in echo_ons/ons_opendata_sites.py
def __init__(self, ons: e_o.Ons) -> None:
"""Class used to get the sites from ONS Open Data. Can be accessed via `ons.opendata.sites`.
Parameters
----------
ons : Ons
Top level object carrying all functionality.
"""
super().__init__(ons)
# * subclasses
self.limitations = OnsOpenDataSiteLimitations(ons)
get()
¶
Gets all the sites from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Returns:
-
DataFrame
–DataFrame containing the sites from ONS Open Data. The following columns are available:
- id_subsistema
- nom_subsistema
- estad_id
- nom_estado
- id_ons_conjunto
- nom_conjunto
Source code in echo_ons/ons_opendata_sites.py
def get(self) -> pd.DataFrame:
"""Gets all the sites from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Returns
-------
pd.DataFrame
DataFrame containing the sites from ONS Open Data. The following columns are available:
- id_subsistema
- nom_subsistema
- estad_id
- nom_estado
- id_ons_conjunto
- nom_conjunto
"""
df = self._get_ons_file()
# first lets drop duplicates only based on id_ons_conjunto and nom_conjunto
df = df.drop_duplicates(subset=["id_ons_conjunto", "nom_conjunto"])
# now in case there are duplicates lets add 2, 3, ... to the "nom_conjunto" column
df["nom_conjunto"] = df["nom_conjunto"].astype("str") + df.groupby("nom_conjunto").cumcount().apply(
lambda x: "" if x == 0 else f"_{x + 1}",
)
# removing unwanted columns
df = df.drop(columns=["id_tipousina", "id_ons_usina", "nom_usina", "ceg", "dat_iniciorelacionamento", "dat_fimrelacionamento"])
# dropping duplicates
df = df.drop_duplicates()
# resetting index
df = df.reset_index(drop=True)
return df
get_related_spes()
¶
Gets all the related SPEs from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Returns:
-
DataFrame
–DataFrame containing the related SPEs from ONS Open Data. The following columns are available:
- site_key
- site_name
- spe_key (also know as CEG in ANEEL data)
- spe_name
Source code in echo_ons/ons_opendata_sites.py
def get_related_spes(self) -> pd.DataFrame:
"""Gets all the related SPEs from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Returns
-------
pd.DataFrame
DataFrame containing the related SPEs from ONS Open Data. The following columns are available:
- site_key
- site_name
- spe_key (also know as CEG in ANEEL data)
- spe_name
"""
df = self._get_ons_file()
# removing unwanted columns
df = df.drop(
columns=[
"id_tipousina",
"id_ons_usina",
"dat_iniciorelacionamento",
"dat_fimrelacionamento",
"id_subsistema",
"nom_subsistema",
"estad_id",
"nom_estado",
],
)
# renaming columns
df = df.rename(columns={"id_ons_conjunto": "site_key", "nom_conjunto": "site_name", "ceg": "spe_key", "nom_usina": "spe_name"})
# getting correct names for the sites
sites_df = df[["site_key", "site_name"]].drop_duplicates()
sites_df["site_name"] = sites_df["site_name"].astype("str") + sites_df.groupby("site_name").cumcount().apply(
lambda x: "" if x == 0 else f"_{x + 1}",
)
# remove "Conj. " from name
sites_df["site_name"] = sites_df["site_name"].str.replace("Conj. ", "")
df = df.drop(columns=["site_name"])
df = df.merge(sites_df, on="site_key", how="left")
# removing last ".XX" from spe_key (its only a verifying digit and might cause problems when joining with ANEEL data)
df["spe_key"] = df["spe_key"].str.replace(r"\.\d*$", "", regex=True)
return df[["site_key", "site_name", "spe_key", "spe_name"]]
import_database(skip_existing=True)
¶
Imports the sites from ONS Open Data to the database.
Parameters:
-
skip_existing
¶bool
, default:True
) –If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True
Source code in echo_ons/ons_opendata_sites.py
def import_database(self, skip_existing: bool = True) -> None:
"""Imports the sites from ONS Open Data to the database.
Parameters
----------
skip_existing : bool, optional
If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True
"""
# checking arguments
if not isinstance(skip_existing, bool):
raise ValueError("skip_existing must be a boolean.")
# getting the sites
df = self.get()
# getting all the existing sites
existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
# adding column ons_site_key if it does not exist
if "ons_site_key" not in existing.columns:
existing["ons_site_key"] = pd.NA
existing["ons_site_key"] = existing["ons_site_key"].astype("string[pyarrow]")
# getting only the non existing sites if skip_existing is True
if skip_existing:
df = df[~df["id_ons_conjunto"].isin(existing["ons_site_key"])].copy()
if df.empty:
logger.info("No new sites to import.")
return
# renaming columns
df = df.rename(
columns={
"id_ons_conjunto": "ons_site_key",
"nom_conjunto": "name",
"nom_subsistema": "ons_subsystem",
"nom_estado": "federation_state",
},
)
# remove "Conj. " from name
df["name"] = df["name"].str.replace("Conj. ", "")
# adding "ONS-Site-" to name
df["name"] = "ONS-Site-" + df["name"]
# first lets add the objects that are not in the database
new_objects = df[~df["ons_site_key"].isin(existing["ons_site_key"])].copy()
for row in new_objects[["name", "ons_site_key"]].to_dict(orient="records"):
self._ons._perfdb.objects.instances.insert( # noqa: SLF001
object_name=row["name"],
object_model_name="ons_site_general",
description=f"""Site "{row["name"].replace("ONS-Site-", "")}" from ONS Open Data (created automatically)""",
on_conflict="ignore",
)
# now lets define exists_in_bazefield to False
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=row["name"],
attribute_name="exists_in_bazefield",
attribute_value=False,
on_conflict="update",
)
# also defining the ons_site_key
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=row["name"],
attribute_name="ons_site_key",
attribute_value=row["ons_site_key"],
on_conflict="update",
)
# getting all objects again to get the new ones
existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
# creating a mapping from ons_site_key to object_name
ons_site_key_to_object_name = existing[["ons_site_key"]].reset_index(drop=False).set_index("ons_site_key")["name"].to_dict()
# * related SPEs
# finding the related SPEs
related_spes = self.get_related_spes()
# getting all spes from the database to create the mapping
spes = self._ons._perfdb.objects.instances.get(object_types=["ons_spe_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
spes = (
spes.reset_index(drop=False)
.rename(columns={"name": "spe_name_db", "ons_spe_key": "spe_key"})[
["spe_name_db", "spe_key", "latitude", "longitude", "nominal_power", "connection_point"]
]
.copy()
)
# merging the related_spes with the spes to get the ons_spe_key
related_spes = related_spes.merge(spes, left_on="spe_key", right_on="spe_key", how="left")
# merging the related_spes with the sites to get the object_name in the database
sites_db = (
existing.reset_index(drop=False)
.rename(columns={"name": "site_name_db", "ons_site_key": "site_key"})[["site_name_db", "site_key"]]
.copy()
)
related_spes = related_spes.merge(sites_db, left_on="site_key", right_on="site_key", how="left")
# dropping rows where spe_name_db is NaN
related_spes = related_spes[~related_spes["spe_name_db"].isna()].copy()
# now lets group by site_name_db and spe_name_db to create a list of spe_name_db for each site_name_db
related_spes = (
related_spes[["site_name_db", "spe_name_db", "latitude", "longitude", "nominal_power", "connection_point"]]
.astype({"spe_name_db": "string"})
.groupby("site_name_db")
.agg(list)
)
# now lets merge the related_spes with df to use it as the ons_spes attribute
df = df.merge(related_spes.rename(columns={"spe_name_db": "ons_spes"}), left_on="name", right_index=True, how="left")
# * coordinates and other SPE aggregate attributes
# getting latitude and longitude as the average of these attributes from the related_spes
df["latitude"] = df["latitude"].apply(lambda x: mean(x) if isinstance(x, list) else pd.NA)
df["longitude"] = df["longitude"].apply(lambda x: mean(x) if isinstance(x, list) else pd.NA)
# nominal power
df["nominal_power"] = df["nominal_power"].apply(lambda x: sum(x) if isinstance(x, list) else pd.NA)
# connection point (first non NaN value in list
df["connection_point"] = df["connection_point"].apply(
lambda x: next((i for i in x if not pd.isna(i)), pd.NA) if isinstance(x, list) else pd.NA,
)
df = df.astype({"latitude": "float64", "longitude": "float64", "nominal_power": "float64", "connection_point": "string[pyarrow]"})
# * data insertion
# then lets update the attributes
attrs = ["ons_subsystem", "federation_state", "ons_spes", "latitude", "longitude", "nominal_power", "connection_point"]
for row in df[["ons_site_key", *attrs]].to_dict(orient="records"):
for attr in attrs:
if not isinstance(row[attr], list) and pd.isna(row[attr]):
logger.warning(f"Site {row['ons_site_key']} has attribute {attr} as NaN. Skipping.")
continue
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=ons_site_key_to_object_name[row["ons_site_key"]],
attribute_name=attr,
attribute_value=row[attr],
on_conflict="update",
)
logger.info(f"Inserted {len(new_objects)} new sites. Updated {len(df) - len(new_objects)} sites.")
OnsOpenDataSiteLimitations(ons)
¶
Class used to get limitations variables in 30 min intervals from ONS Open Data. Can be accessed via ons.opendata.sites.limitations
.
The data is available in the following links:
- Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
- Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
ons : Ons
Top level object carrying all functionality and the connection handler.
"""
# check inputs
if not isinstance(ons, e_o.Ons):
raise ValueError(f"ons must be of type Ons, not {type(ons)}")
self._ons: e_o.Ons = ons
get(period)
¶
Gets all limitation variables within the given period from ONS Open Data.
The data is available in the following links:
- Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
- Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Parameters:
-
period
¶DateTimeRange
) –The period for which the data will be retrieved.
Returns:
-
DataFrame
–DataFrame containing the sites from ONS Open Data. The following columns are available:
- subsystem_key (str): the subsystem key
- subsystem_name (str): the subsystem name
- federation_state_key (str): the federation state key
- federation_state_name (str): the federation state name
- site_name (str): the site name
- site_key (str): the site key
- timestamp (datetime64[ns]): the timestamp of the data
- ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
- ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
- ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
- ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
- ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
- LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
- LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
Source code in echo_ons/ons_opendata_site_limitations.py
def get(self, period: DateTimeRange) -> pd.DataFrame:
"""Gets all limitation variables within the given period from ONS Open Data.
The data is available in the following links:
- Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
- Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
Returns
-------
pd.DataFrame
DataFrame containing the sites from ONS Open Data. The following columns are available:
- subsystem_key (str): the subsystem key
- subsystem_name (str): the subsystem name
- federation_state_key (str): the federation state key
- federation_state_name (str): the federation state name
- site_name (str): the site name
- site_key (str): the site key
- timestamp (datetime64[ns]): the timestamp of the data
- ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
- ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
- ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
- ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
- ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
- LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
- LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
"""
# the files are available through AWS S3 in both CSV, xlsx and parquet formats (all with the same name)
# each file contains data for one month
# the files are named as:
# - wind: RESTRICAO_COFF_EOLICA_YYYY_MM.xlsx (or .csv or .parquet)
# - solar: RESTRIÇÃO_COFF_FOTOVOLTAICA_YYYY_MM.xlsx (or .csv or .parquet)
base_urls = {
"wind": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_eolica_tm/RESTRICAO_COFF_EOLICA",
"solar": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_fotovoltaica_tm/RESTRICAO_COFF_FOTOVOLTAICA",
}
# defining the months that we will need to download
get_period = period.copy()
get_period.start = get_period.start.replace(day=1)
get_periods = get_period.split_multiple(relativedelta(months=1))
# creating temporary folder
tmp_folder = Path("./tmp/")
tmp_folder.mkdir(exist_ok=True)
# iterating over the periods
dfs = []
for get_period in get_periods:
# iterating over the site types
for site_type, base_url in base_urls.items():
# creating the url (first lets try to get the parquet file, which is the fastest but not always available)
url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.parquet"
try:
t0 = perf_counter()
logger.info(f"{site_type} - {get_period.start.strftime('%Y-%m')} - trying to get parquet file at '{url}'")
file_name = tmp_folder / f"{site_type}_{get_period.start.strftime('%Y_%m')}.parquet"
urlretrieve(url, file_name)
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s to '{file_name}'")
except Exception as e:
url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.csv"
if isinstance(e, HTTPError) and e.code == 404:
logger.warning(
f"{site_type} - {get_period.start.strftime('%Y-%m')} - parquet file not found, trying to get csv file at '{url}'",
)
else:
logger.exception(
f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get parquet file, trying to get csv file at '{url}'",
)
try:
t0 = perf_counter()
file_name = tmp_folder / f"{site_type}_{get_period.start.strftime('%Y_%m')}.csv"
urlretrieve(url, file_name)
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s to '{file_name}'")
except Exception as e2:
file_name = None
if isinstance(e2, HTTPError) and e2.code == 404:
logger.error(f"{site_type} - {get_period.start.strftime('%Y-%m')} - csv file not found")
else:
logger.exception(
f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get both parquet and csv files",
)
# if we could not get the file, we skip to the next iteration
if file_name is None:
continue
# reading the file
if file_name.suffix == ".parquet":
df = pd.read_parquet(file_name, engine="pyarrow", dtype_backend="pyarrow")
elif file_name.suffix == ".csv":
df = pd.read_csv(file_name, sep=";", decimal=".", engine="pyarrow", dtype_backend="pyarrow")
# forcing type casting to avoid errors
df = df.astype(
{
"id_subsistema": "string[pyarrow]",
"nom_subsistema": "string[pyarrow]",
"id_estado": "string[pyarrow]",
"nom_estado": "string[pyarrow]",
"nom_usina": "string[pyarrow]",
"id_ons": "string[pyarrow]",
"ceg": "string[pyarrow]",
"val_geracao": "double[pyarrow]",
"val_geracaolimitada": "double[pyarrow]",
"val_disponibilidade": "double[pyarrow]",
"val_geracaoreferencia": "double[pyarrow]",
"val_geracaoreferenciafinal": "double[pyarrow]",
"cod_razaorestricao": "string[pyarrow]",
"cod_origemrestricao": "string[pyarrow]",
},
)
# checking if din_instante is a string or a datetime, if a string we convert it to datetime
if df["din_instante"].dtype == "string":
df["din_instante"] = pd.to_datetime(df["din_instante"], format="%Y-%m-%d %H:%M:%S")
df["din_instante"] = df["din_instante"].astype("datetime64[s]")
# adding the DataFrame to the list
dfs.append(df)
# getting features from the database to rename
features = self._ons._perfdb.features.definitions.get( # noqa: SLF001
object_models=["ons_site_general"],
data_source_types=["ons_open_data"],
output_type="DataFrame",
)
features.index = features.index.droplevel("object_model_name")
feature_rename = features.reset_index(drop=False).set_index("name_in_data_source")["name"].to_dict()
# concatenating the DataFrames
df = pd.concat(dfs, ignore_index=True)
# filtering the DataFrame to the period
df = df[(df["din_instante"] >= period.start) & (df["din_instante"] <= period.end)]
# dropping not used columns
df = df.drop(columns=["ceg"])
# renaming columns for easier understanding
df = df.rename(
columns={
"id_subsistema": "subsystem_key",
"nom_subsistema": "subsystem_name",
"id_estado": "federation_state_key",
"nom_estado": "federation_state_name",
"nom_usina": "site_name",
"id_ons": "site_key",
"din_instante": "timestamp",
}
| feature_rename,
)
# removing temporary folder
rmtree(tmp_folder, ignore_errors=True)
return df
import_database(period)
¶
Imports the limitations data from ONS Open Data to the local database.
Parameters:
-
period
¶DateTimeRange
) –The period for which the data will be retrieved.
Source code in echo_ons/ons_opendata_site_limitations.py
def import_database(self, period: DateTimeRange) -> None:
"""Imports the limitations data from ONS Open Data to the local database.
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
"""
# checking inputs
if not isinstance(period, DateTimeRange):
raise TypeError(f"period must be a DateTimeRange object. Got {type(period)}.")
# getting features from the database to rename
features = self._ons._perfdb.features.definitions.get( # noqa: SLF001
object_models=["ons_site_general"],
data_source_types=["ons_open_data"],
output_type="DataFrame",
)
features.index = features.index.droplevel("object_model_name")
# getting the data
df = self.get(period)
limitation_reason_feature = features[features["name_in_data_source"] == "cod_razaorestricao"].index[0]
limitation_origin_feature = features[features["name_in_data_source"] == "cod_origemrestricao"].index[0]
# replacing values in LimitationReason_30min.REP considering 1=REL, 2=CNF, 3=ENE
df[limitation_reason_feature] = df[limitation_reason_feature].map({"REL": 1, "CNF": 2, "ENE": 3})
# replacing values in LimitationOrigin_30min.REP 1=SIS, 2=LOC
df[limitation_origin_feature] = df[limitation_origin_feature].map({"SIS": 1, "LOC": 2})
df = df.astype({limitation_reason_feature: "double[pyarrow]", limitation_origin_feature: "double[pyarrow]"})
# keeping only the columns needed for the database
df = df[
[
"site_key",
"timestamp",
*features.index.to_list(),
]
].copy()
# getting all the ONS Sites from the database to replace the site_key by their name in the database
ons_sites = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
# removing the sites that do not have ons_site_key attribute defined in the database
ons_sites = ons_sites[ons_sites["ons_site_key"].notna()].copy()
# making a dictionary to replace the site_key by the object_name
rename = ons_sites.reset_index(drop=False).set_index("ons_site_key")["name"].to_dict()
# checking if all site_keys are in the dictionary
missing_keys = set(df["site_key"].unique()) - set(rename.keys())
if missing_keys:
logger.warning(f"No ONS sites found for the following keys. They will not be imported. {missing_keys}")
df = df[~df["site_key"].isin(missing_keys)].copy()
# replacing the site_key by the object_name
df["site_key"] = df["site_key"].map(rename)
# renaming the column
df = df.rename(columns={"site_key": "object_name"})
# melting the DataFrame to have the feature_name as a column
df = df.melt(id_vars=["object_name", "timestamp"], var_name="feature_name", value_name="value")
# pivoting the DataFrame to have timestamp as the index and the columns as a MultiIndex with the object_name and feature_name as its levels
df = df.pivot_table(values="value", index="timestamp", columns=["object_name", "feature_name"])
# inserting the data into the database
self._ons._perfdb.features.values.series.insert(df=df, on_conflict="update", bazefield_upload=False) # noqa: SLF001