Open Data - Sites¶
OnsOpenDataSites(ons)
¶
Class used to get the sites from ONS Open Data. Can be accessed via ons.opendata.sites.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Parameters:
-
(ons¶Ons) –Top level object carrying all functionality.
Source code in echo_ons/ons_opendata_sites.py
def __init__(self, ons: e_o.Ons) -> None:
"""Class used to get the sites from ONS Open Data. Can be accessed via `ons.opendata.sites`.
Parameters
----------
ons : Ons
Top level object carrying all functionality.
"""
super().__init__(ons)
# * subclasses
self.limitations = OnsOpenDataSiteLimitations(ons)
get(output_type='DataFrame')
¶
Gets all the sites from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Parameters:
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".
Returns:
-
DataFrame–DataFrame containing the sites from ONS Open Data. The following columns are available:
- id_subsistema
- nom_subsistema
- estad_id
- nom_estado
- id_ons_conjunto
- nom_conjunto
Source code in echo_ons/ons_opendata_sites.py
@validate_call
def get(self, output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame") -> pd.DataFrame:
"""Gets all the sites from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Parameters
----------
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".
Returns
-------
pd.DataFrame
DataFrame containing the sites from ONS Open Data. The following columns are available:
- id_subsistema
- nom_subsistema
- estad_id
- nom_estado
- id_ons_conjunto
- nom_conjunto
"""
df = self._get_ons_file()
# first lets drop duplicates only based on id_ons_conjunto and nom_conjunto
df = df.unique(subset=["id_ons_conjunto", "nom_conjunto"])
# now in case there are duplicates lets add 2, 3, ... to the "nom_conjunto" column
df = df.with_columns(
(
pl.col("nom_conjunto").cast(pl.Utf8)
+ pl.col("nom_conjunto")
.cum_count()
.over("nom_conjunto")
.map_elements(
lambda x: "" if x == 1 else f"_{x}",
return_dtype=pl.Utf8,
)
).alias("nom_conjunto"),
)
# removing unwanted columns
df = df.drop(["id_tipousina", "id_ons_usina", "nom_usina", "ceg", "dat_iniciorelacionamento", "dat_fimrelacionamento"])
# dropping duplicates
df = df.unique()
if output_type == "DataFrame":
return df.to_pandas(use_pyarrow_extension_array=True)
return df
get_related_spes(output_type='DataFrame')
¶
Gets all the related SPEs from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Parameters:
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".
Returns:
-
DataFrame–DataFrame containing the related SPEs from ONS Open Data. The following columns are available:
- site_key
- site_name
- spe_key (also know as CEG in ANEEL data)
- spe_name
- relation_start
- relation_end
- federation_state
Source code in echo_ons/ons_opendata_sites.py
@validate_call
def get_related_spes(self, output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame") -> pd.DataFrame:
"""Gets all the related SPEs from ONS Open Data.
The data is available in the following link: https://dados.ons.org.br/dataset/usina_conjunto
Parameters
----------
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Type of output to return. Can be "DataFrame" or "pl.DataFrame". By default "DataFrame".
Returns
-------
pd.DataFrame
DataFrame containing the related SPEs from ONS Open Data. The following columns are available:
- site_key
- site_name
- spe_key (also know as CEG in ANEEL data)
- spe_name
- relation_start
- relation_end
- federation_state
"""
df = self._get_ons_file()
# removing unwanted columns
df = df.drop(
[
"id_tipousina",
"id_ons_usina",
"id_subsistema",
"nom_subsistema",
"estad_id",
],
)
# renaming columns
df = df.rename(
{
"id_ons_conjunto": "site_key",
"nom_conjunto": "site_name",
"ceg": "spe_key",
"nom_usina": "spe_name",
"dat_iniciorelacionamento": "relation_start",
"dat_fimrelacionamento": "relation_end",
"nom_estado": "federation_state",
},
)
# getting correct names for the sites
sites_df = df[["site_key", "site_name"]].unique()
sites_df = sites_df.with_columns(
(
pl.col("site_name").cast(pl.Utf8)
+ pl.col("site_name").cum_count().over("site_name").map_elements(lambda x: "" if x == 1 else f"_{x}", return_dtype=pl.Utf8)
).alias("site_name"),
)
# remove "Conj. " from name
sites_df = sites_df.with_columns(
pl.col("site_name").str.replace("Conj. ", ""),
)
df = df.drop("site_name")
df = df.join(sites_df, on="site_key", how="left")
# removing last ".XX" from spe_key (its only a verifying digit and might cause problems when joining with ANEEL data)
df = df.with_columns(
pl.col("spe_key").str.replace(r"\.\d*$", ""),
)
df = df[["site_key", "site_name", "spe_key", "spe_name", "relation_start", "relation_end"]]
if output_type == "DataFrame":
return df.to_pandas(use_pyarrow_extension_array=True)
return df
import_database(skip_existing=True)
¶
Imports the sites from ONS Open Data to the database.
Parameters:
-
(skip_existing¶bool, default:True) –If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True
Source code in echo_ons/ons_opendata_sites.py
@validate_call
def import_database(self, skip_existing: bool = True) -> None:
"""Imports the sites from ONS Open Data to the database.
Parameters
----------
skip_existing : bool, optional
If set to False will overwrite all attributes of the existing sites, if True will skip the sites that already exist in the database. By default True
"""
# getting the sites
df: pl.DataFrame = self.get(output_type="pl.DataFrame")
# getting all the existing sites
existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
existing = pl.from_pandas(existing.reset_index(drop=False))
# adding column ons_site_key if it does not exist
if "ons_site_key" not in existing.columns:
existing = existing.with_columns(pl.lit(None).cast(pl.String).alias("ons_site_key"))
# getting only the non existing sites if skip_existing is True
if skip_existing:
df = df.filter(~pl.col("id_ons_conjunto").is_in(existing["ons_site_key"]))
if df.is_empty():
logger.info("No new sites to import.")
return
# renaming columns
df = df.rename(
{
"id_ons_conjunto": "ons_site_key",
"nom_conjunto": "name",
"nom_subsistema": "ons_subsystem",
"nom_estado": "federation_state",
},
)
# remove "Conj. " from name
df = df.with_columns(
pl.col("name").str.replace("Conj. ", ""),
)
# adding "ONS-Site-" to name
df = df.with_columns(
("ONS-Site-" + pl.col("name")).alias("name"),
)
# first lets add the objects that are not in the database
new_objects = df.filter(~pl.col("ons_site_key").is_in(existing["ons_site_key"]))
for row in new_objects[["name", "ons_site_key"]].iter_rows(named=True):
self._ons._perfdb.objects.instances.insert( # noqa: SLF001
object_name=row["name"],
object_model_name="ons_site_general",
description=f"""Site "{row["name"].replace("ONS-Site-", "")}" from ONS Open Data (created automatically)""",
on_conflict="ignore",
)
# now lets define exists_in_bazefield to False
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=row["name"],
attribute_name="exists_in_bazefield",
attribute_value=False,
on_conflict="update",
)
# also defining the ons_site_key
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=row["name"],
attribute_name="ons_site_key",
attribute_value=row["ons_site_key"],
on_conflict="update",
)
# getting all objects again to get the new ones
existing = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
existing = pl.from_pandas(existing.reset_index(drop=False))
# creating a mapping from ons_site_key to object_name
ons_site_key_to_object_name = dict(zip(existing["ons_site_key"], existing["name"], strict=False))
# * related SPEs
related_spes = self.get_related_spes(output_type="pl.DataFrame")
# getting all spes from the database to create the mapping
spes = self._ons._perfdb.objects.instances.get(object_types=["ons_spe_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
spes = pl.from_pandas(spes.reset_index(drop=False))
# merging the related_spes with the spes based on ons_spe_key to get the database name
related_spes = related_spes.join(
spes[["name", "ons_spe_key"]].rename({"name": "spe_db_name", "ons_spe_key": "spe_key"}),
on="spe_key",
how="left",
)
# merging the related_spes with the sites to get the object_name in the database
related_spes = related_spes.join(
existing[["name", "ons_site_key"]].rename({"name": "site_db_name", "ons_site_key": "site_key"}),
on="site_key",
how="left",
)
# dropping rows without spe_db_name or site_db_name
related_spes = related_spes.filter(~pl.col("spe_db_name").is_null() & ~pl.col("site_db_name").is_null())
# inserting on the database
related_spes = related_spes[["site_db_name", "spe_db_name", "relation_start", "relation_end"]].rename(
{"site_db_name": "ons_site_name", "spe_db_name": "ons_spe_name"},
)
self._ons._perfdb.ons.spes.sitemapping.insert(data=related_spes, on_conflict="update") # noqa: SLF001
# * coordinates and other SPE aggregate attributes
related_spes = related_spes.join(
spes[["name", "latitude", "longitude", "nominal_power", "connection_point", "ons_spe_type"]],
left_on="ons_spe_name",
right_on="name",
how="left",
)
# grouping by ons_site_name
site_spe_attrs_df = related_spes.group_by("ons_site_name").agg(
pl.col("ons_spe_type").mode().first().alias("ons_site_type"),
pl.col("latitude").mean().alias("latitude"),
pl.col("longitude").mean().alias("longitude"),
pl.col("nominal_power").sum().alias("nominal_power"),
pl.col("connection_point").mode().first().alias("connection_point"),
)
existing = existing[["ons_site_key", "name", "federation_state", "ons_subsystem"]].join(
site_spe_attrs_df.rename({"ons_site_name": "name"}),
on="name",
how="left",
)
# * data insertion
# then lets update the attributes
attrs = ["ons_subsystem", "federation_state", "latitude", "longitude", "nominal_power", "connection_point", "ons_site_type"]
for row in existing[["ons_site_key", *attrs]].iter_rows(named=True):
for attr in attrs:
if row[attr] is None:
logger.warning(f"Site {row['ons_site_key']} has attribute {attr} as None. Skipping.")
continue
self._ons._perfdb.objects.instances.attributes.insert( # noqa: SLF001
object_name=ons_site_key_to_object_name[row["ons_site_key"]],
attribute_name=attr,
attribute_value=row[attr],
on_conflict="update",
)
logger.info(f"Inserted {len(new_objects)} new sites. Updated {len(df) - len(new_objects)} sites.")
OnsOpenDataSiteLimitations(ons)
¶
Class used to get limitations variables in 30 min intervals from ONS Open Data. Can be accessed via ons.opendata.sites.limitations.
The data is available in the following links:
- Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
- Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
ons : Ons
Top level object carrying all functionality and the connection handler.
"""
self._ons: e_o.Ons = ons
get(period, output_type='DataFrame')
¶
Gets all limitation variables within the given period from ONS Open Data.
The data is available in the following links:
- Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
- Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Parameters:
-
(period¶DateTimeRange) –The period for which the data will be retrieved.
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:"DataFrame") –The type of DataFrame to be returned. Can be either "DataFrame" for pandas DataFrame or "pl.DataFrame" for polars DataFrame.
Returns:
-
DataFrame | DataFrame–DataFrame containing the sites from ONS Open Data. The following columns are available:
- subsystem_key (str): the subsystem key
- subsystem_name (str): the subsystem name
- federation_state_key (str): the federation state key
- federation_state_name (str): the federation state name
- site_name (str): the site name
- site_key (str): the site key
- timestamp (datetime64[ns]): the timestamp of the data
- ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
- ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
- ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
- ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
- ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
- LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
- LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
Source code in echo_ons/ons_opendata_site_limitations.py
@validate_call
def get(self, period: DateTimeRange, output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame") -> pd.DataFrame | pl.DataFrame:
"""Gets all limitation variables within the given period from ONS Open Data.
The data is available in the following links:
- Wind sites: https://dados.ons.org.br/dataset/restricao_coff_eolica_usi
- Solar sites: https://dados.ons.org.br/dataset/restricao_coff_fotovoltaica
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
output_type : Literal["DataFrame", "pl.DataFrame"], default "DataFrame"
The type of DataFrame to be returned. Can be either "DataFrame" for pandas DataFrame or "pl.DataFrame" for polars DataFrame.
Returns
-------
pd.DataFrame | pl.DataFrame
DataFrame containing the sites from ONS Open Data. The following columns are available:
- subsystem_key (str): the subsystem key
- subsystem_name (str): the subsystem name
- federation_state_key (str): the federation state key
- federation_state_name (str): the federation state name
- site_name (str): the site name
- site_key (str): the site key
- timestamp (datetime64[ns]): the timestamp of the data
- ActivePowerVerified_30min.AVG (float): the active power verified in 30 min intervals
- ActivePowerLimited_30min.AVG (float): the active power limited in 30 min intervals
- ActivePowerAvailable_30min.AVG (float): the active power available in 30 min intervals
- ActivePowerReference_30min.AVG (float): the active power reference in 30 min intervals
- ActivePowerReferenceFinal_30min.AVG (float): the active power reference final in 30 min intervals
- LimitationReason_30min.REP (str): the limitation reason in 30 min intervals
- LimitationOrigin_30min.REP (str): the limitation origin in 30 min intervals
"""
# the files are available through AWS S3 in both CSV, xlsx and parquet formats (all with the same name)
# each file contains data for one month
# the files are named as:
# - wind: RESTRICAO_COFF_EOLICA_YYYY_MM.xlsx (or .csv or .parquet)
# - solar: RESTRIÇÃO_COFF_FOTOVOLTAICA_YYYY_MM.xlsx (or .csv or .parquet)
base_urls = {
"wind": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_eolica_tm/RESTRICAO_COFF_EOLICA",
"solar": "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/restricao_coff_fotovoltaica_tm/RESTRICAO_COFF_FOTOVOLTAICA",
}
# defining the months that we will need to download
get_period = period.copy()
get_period.start = get_period.start.replace(day=1)
get_periods = get_period.split_multiple(relativedelta(months=1))
# iterating over the periods
dfs = []
for get_period in get_periods:
# iterating over the site types
for site_type, base_url in base_urls.items():
# creating the url (first lets try to get the parquet file, which is the fastest but not always available)
url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.parquet"
try:
t0 = perf_counter()
logger.info(f"{site_type} - {get_period.start.strftime('%Y-%m')} - trying to get parquet file at '{url}'")
df = pl.read_parquet(url)
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s")
except Exception as e:
url = f"{base_url}_{get_period.start.strftime('%Y_%m')}.csv"
if isinstance(e, HTTPError) and e.code == 404:
logger.warning(
f"{site_type} - {get_period.start.strftime('%Y-%m')} - parquet file not found, trying to get csv file at '{url}'",
)
else:
logger.exception(
f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get parquet file, trying to get csv file at '{url}'",
)
try:
t0 = perf_counter()
df = pl.read_csv(url, separator=";", infer_schema_length=None)
logger.info(f"Downloaded in {perf_counter() - t0:.2f}s")
except Exception as e2:
df = None
if isinstance(e2, HTTPError) and e2.code == 404:
logger.error(f"{site_type} - {get_period.start.strftime('%Y-%m')} - csv file not found")
else:
logger.exception(
f"{site_type} - {get_period.start.strftime('%Y-%m')} - could not get both parquet and csv files",
)
# if we could not get the file, we skip to the next iteration
if df is None:
continue
# in case val columns are stored as string, convert it to numbers using , as decimal separator
for col in df.select(pl.selectors.string()).columns:
if col.startswith("val_"):
df = df.with_columns(
pl.col(col).cast(pl.Float64, strict=False),
)
# checking if din_instante is a string or a datetime, if a string we convert it to datetime
if "din_instante" in df.select(pl.selectors.string()).columns:
df = df.with_columns(
pl.col("din_instante").str.strptime(pl.Datetime, fmt="%Y-%m-%d %H:%M:%S", strict=False),
)
# adding the DataFrame to the list
dfs.append(df)
# getting features from the database to rename
features = self._ons._perfdb.features.definitions.get( # noqa: SLF001
object_models=["ons_site_general"],
data_source_types=["ons_open_data"],
output_type="DataFrame",
)
features.index = features.index.droplevel("object_model_name")
feature_rename = features.reset_index(drop=False).set_index("name_in_data_source")["name"].to_dict()
# concatenating the DataFrames
df: pl.DataFrame = pl.concat(dfs, how="vertical")
# filtering the DataFrame to the period
df = df.filter(
(df["din_instante"] >= pl.lit(period.start)) & (df["din_instante"] <= pl.lit(period.end)),
)
# dropping not used columns
df = df.drop(["ceg"])
# renaming columns for easier understanding
df = df.rename(
{
"id_subsistema": "subsystem_key",
"nom_subsistema": "subsystem_name",
"id_estado": "federation_state_key",
"nom_estado": "federation_state_name",
"nom_usina": "site_name",
"id_ons": "site_key",
"din_instante": "timestamp",
}
| feature_rename,
)
if output_type == "DataFrame":
df = df.to_pandas(use_pyarrow_extension_array=True)
return df
import_database(period)
¶
Imports the limitations data from ONS Open Data to the local database.
Parameters:
-
(period¶DateTimeRange) –The period for which the data will be retrieved.
Source code in echo_ons/ons_opendata_site_limitations.py
@validate_call
def import_database(self, period: DateTimeRange) -> None:
"""Imports the limitations data from ONS Open Data to the local database.
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
"""
# getting features from the database to rename
features = self._ons._perfdb.features.definitions.get( # noqa: SLF001
object_models=["ons_site_general"],
data_source_types=["ons_open_data"],
output_type="DataFrame",
)
features.index = features.index.droplevel("object_model_name")
features = pl.from_pandas(features.reset_index(drop=False))
# getting the data
df = self.get(period, output_type="pl.DataFrame")
limitation_reason_feature = features.filter(pl.col("name_in_data_source") == "cod_razaorestricao").select("name").item()
limitation_origin_feature = features.filter(pl.col("name_in_data_source") == "cod_origemrestricao").select("name").item()
# replacing values in LimitationReason_30min.REP considering 1=REL, 2=CNF, 3=ENE
df = df.with_columns(
pl.col(limitation_reason_feature).replace(
{"REL": 1, "CNF": 2, "ENE": 3, "": None},
return_dtype=pl.Int64,
),
)
# replacing values in LimitationOrigin_30min.REP 1=SIS, 2=LOC
df = df.with_columns(
pl.col(limitation_origin_feature).replace(
{"SIS": 1, "LOC": 2, "": None},
return_dtype=pl.Int64,
),
)
# keeping only the columns needed for the database
df = df[
[
"site_key",
"timestamp",
*features["name"].to_list(),
]
]
# getting all the ONS Sites from the database to replace the site_key by their name in the database
ons_sites = self._ons._perfdb.objects.instances.get(object_types=["ons_site_general"], get_attributes=True, output_type="DataFrame") # noqa: SLF001
ons_sites = pl.from_pandas(ons_sites.reset_index(drop=False))
# removing the sites that do not have ons_site_key attribute defined in the database
ons_sites = ons_sites.filter(pl.col("ons_site_key").is_not_null())
# making a dictionary to replace the site_key by the object_name
rename = ons_sites.select(["ons_site_key", "name"]).to_pandas().set_index("ons_site_key")["name"].to_dict()
# checking if all site_keys are in the dictionary
missing_keys = set(df["site_key"].unique()) - set(rename.keys())
if missing_keys:
logger.warning(f"No ONS sites found for the following keys. They will not be imported. {missing_keys}")
df = df.filter(~pl.col("site_key").is_in(list(missing_keys)))
# replacing the site_key by the object_name
df = df.with_columns(pl.col("site_key").replace(rename))
# renaming the column
df = df.rename({"site_key": "object_name"})
# melting so all columns except object_name and timestamp become feature_name and value
df = df.unpivot(index=["object_name", "timestamp"], variable_name="feature_name", value_name="value")
# adding object_name as a prefix to feature_name
df = df.with_columns(
pl.concat_str([pl.col("object_name"), pl.lit("@"), pl.col("feature_name")]).alias("feature_name"),
)
# dropping object_name column
df = df.drop("object_name")
# pivoting the DataFrame to have timestamp as the index and the columns as feature_name
df = df.pivot(values="value", index="timestamp", on="feature_name", aggregate_function="first")
# inserting the data into the database
self._ons._perfdb.features.values.series.insert(df=df, on_conflict="update", bazefield_upload=False) # noqa: SLF001