Sager - Status¶
OnsSagerStatus(ons)
¶
Class used for handling ONS Sager status of limitation events. Can be accessed via ons.sager.status.
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
ons : Ons
Top level object carrying all functionality and the connection handler.
"""
self._ons: e_o.Ons = ons
get(period, site_names, output_type='DataFrame')
¶
Gets the status of limitation events for the specified period and sites.
The result has the following columns: - site_name: Name of the site (object) as in the performance database - dataOcorrenciaEvento - idFonte - nomeStatus - agente - ehAgenteFavorito - idAgente - codigoAgente - idFonte - usinaConjunto - idUsinaConjunto - idEvento - temRestricao - apresentarVisaoDetalhada - apresentarRedisponibilizar - ehConjunto
Parameters:
-
(period¶DateTimeRange) –The time period to retrieve data for.
-
(site_names¶list[str] | None) –The names of the sites to retrieve data for. This name must match the object name in the database.
If set to None, all sites in the database will be requested.
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns:
-
DataFrame | DataFrame–DataFrame containing the status of limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
-
ErrorSummary–A summary of errors encountered during the retrieval process.
Source code in echo_ons/ons_sager_status.py
@validate_call
def get(
self,
period: DateTimeRange,
site_names: list[str] | None,
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> tuple[pd.DataFrame | pl.DataFrame, ErrorSummary]:
"""Gets the status of limitation events for the specified period and sites.
The result has the following columns:
- site_name: Name of the site (object) as in the performance database
- dataOcorrenciaEvento
- idFonte
- nomeStatus
- agente
- ehAgenteFavorito
- idAgente
- codigoAgente
- idFonte
- usinaConjunto
- idUsinaConjunto
- idEvento
- temRestricao
- apresentarVisaoDetalhada
- apresentarRedisponibilizar
- ehConjunto
Parameters
----------
period : DateTimeRange
The time period to retrieve data for.
site_names : list[str] | None
The names of the sites to retrieve data for. This name must match the object name in the database.
If set to None, all sites in the database will be requested.
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns
-------
pd.DataFrame | pl.DataFrame
DataFrame containing the status of limitation events. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
ErrorSummary
A summary of errors encountered during the retrieval process.
"""
# creating error ErrorSummary
error_summary = ErrorSummary(name="OnsSagerStatus")
# getting the objects and attributes from the database
ons_sites = self._ons._perfdb.objects.instances.get( # noqa: SLF001
object_names=site_names,
object_types=["ons_site"],
get_attributes=True,
attribute_names=["ons_site_id"],
)
if not ons_sites:
raise ValueError(f"No sites found with the specified names: {site_names}")
# iterating sites
site_list = []
for site_name, site_data in ons_sites.items():
if "ons_site_id" not in site_data or not site_data["ons_site_id"]:
logger.error(f"Site '{site_name}' does not have 'ons_site_id' attribute. It will be skipped.")
# creating error object
error_obj = ErrorObject(
name=site_name,
exceptions=[ValueError(f"Site '{site_name}' does not have 'ons_site_id' attribute.")],
)
error_summary.add_child(error_obj)
continue
site_list.append(site_data["ons_site_id"])
site_list = sorted(set(site_list))
# iterating each page
page = 0
finished = False
all_events = []
while not finished:
endpoint = "ConsultarEventos/obterEventosGeral"
payload = {
"dataOcorrenciaInicio": f"{period.start:%Y-%m-%d}T03:00:00.000Z",
"dataOcorrenciaFim": f"{period.end:%Y-%m-%d}T03:00:00.000Z",
"apenasAgentesFavoritos": False,
"idsStatus": [],
"idsFontes": [],
"idsAgentes": [],
"idsUsinas": [],
"idsConjuntos": site_list,
"ehExportacao": False,
"pagina": page,
}
result = self._ons.sager.conn.post(endpoint=endpoint, json=payload)
self._handle_http_errors(result)
result = result.json()
if len(result["content"]["listaObjetos"]) == 0:
finished = True
continue
all_events.extend(result["content"]["listaObjetos"])
page += 1
df = pl.from_dicts(
all_events,
schema={
"dataOcorrenciaEvento": pl.String,
"fonte": pl.String,
"nomeStatus": pl.String,
"agente": pl.String,
"ehAgenteFavorito": pl.Boolean,
"idAgente": pl.Int32,
"codigoAgente": pl.String,
"idFonte": pl.Int32,
"usinaConjunto": pl.String,
"idUsinaConjunto": pl.Int32,
"idEvento": pl.Int32,
"temRestricao": pl.Boolean,
"apresentarVisaoDetalhada": pl.Boolean,
"apresentarRedisponibilizar": pl.Boolean,
"ehConjunto": pl.Boolean,
},
)
# converting date to datetime without time zone
df = df.with_columns(
[
pl.col("dataOcorrenciaEvento").str.strptime(pl.Datetime, "%d/%m/%Y"),
],
)
# mapping idUsinaConjunto to site_name
df = df.with_columns(
[
pl.col("idUsinaConjunto")
.replace_strict(
{v["ons_site_id"]: k for k, v in ons_sites.items()},
default=None,
)
.alias("site_name"),
],
)
# sorting by usinaConjunto and dataOcorrenciaEvento
df = df.sort(["usinaConjunto", "dataOcorrenciaEvento"])
if output_type == "pl.DataFrame":
return df, error_summary
return df.to_pandas(use_pyarrow_extension_array=True), error_summary
import_database(period, site_names=None)
¶
Imports the limitations status to the performance database
Parameters:
-
(period¶DateTimeRange) –The period for which the data will be retrieved.
-
(site_names¶list[str] | None, default:None) –The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.
By default None.
Returns:
-
ErrorSummary–A summary of errors encountered during the import process.
Source code in echo_ons/ons_sager_status.py
@validate_call
def import_database(self, period: DateTimeRange, site_names: list[str] | None = None) -> ErrorSummary:
"""Imports the limitations status to the performance database
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
site_names : list[str] | None, optional
The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.
By default None.
Returns
-------
ErrorSummary
A summary of errors encountered during the import process.
"""
# checking if all the requested objects exist in self._ons.sager.data_source_objects
if site_names:
wrong_objs = set(site_names) - set(self._ons.sager.data_source_objects)
if wrong_objs:
raise ValueError(f"Requested site names not connected to data source ons_sager: {wrong_objs}")
else:
site_names = self._ons.sager.data_source_objects
# getting the data
df, error_summary = self.get(period=period, site_names=site_names, output_type="pl.DataFrame")
# we need to convert columns to object_id, date, status and has_limitations
# first lets get the object ids
obj_ids = self._ons._perfdb.objects.instances.get_ids(object_names=site_names) # noqa: SLF001
# then lets filter just the relevant columns
df = df[["site_name", "dataOcorrenciaEvento", "nomeStatus", "temRestricao"]]
# replacing site_name with object_id
df = df.with_columns(
[
pl.col("site_name")
.replace_strict(
obj_ids,
default=None,
)
.alias("object_id"),
],
)
# renaming columns
df = df.rename(
{
"dataOcorrenciaEvento": "date",
"nomeStatus": "status",
"temRestricao": "has_limitations",
},
)
# converting date to date (from datetime)
df = df.with_columns(
[
pl.col("date").cast(pl.Date),
],
)
# dropping columns
df = df.drop("site_name")
# fill NULL status column with "Desconhecido"
df = df.with_columns(
[
pl.when(pl.col("status").is_null()).then(pl.lit("Desconhecido")).otherwise(pl.col("status")).alias("status"),
],
)
# uploading to postgres
with self._ons._perfdb.conn.reconnect() as conn: # noqa: SLF001
logger.info(f"Inserting {len(df)} rows in 'ons_data_validation'")
conn.polars_to_sql(df=df, table_name="ons_data_validation", if_exists="update", schema="performance")
return error_summary