Sager - Time Series¶
OnsSagerTimeseries(ons)
¶
Class used for handling ONS Sager timeseries data. Can be accessed via ons.sager.timeseries.
Source code in echo_ons/ons_root.py
def __init__(self, ons: e_o.Ons) -> None:
"""Base class that all subclasses should inherit from.
Parameters
----------
ons : Ons
Top level object carrying all functionality and the connection handler.
"""
self._ons: e_o.Ons = ons
get(period, site_names, output_type='DataFrame')
¶
Gets the timeseries data for the specified period and sites.
The result has the following columns: - site_name: Name of the site (object) as in the performance database - ons_site_id: Id of the site in ONS systems - timestamp_start - timestamp_end - disponibilidade - geracaoLimitada - geracaoReferencia - geracaoReferenciaFinal - geracaoVerificada
Parameters:
-
(period¶DateTimeRange) –The time period to retrieve data for.
-
(site_names¶list[str] | None) –The names of the sites to retrieve data for. This name must match the object name in the database.
If set to None, all sites in the database will be requested.
-
(output_type¶Literal['DataFrame', 'pl.DataFrame'], default:'DataFrame') –Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns:
-
DataFrame | DataFrame–DataFrame containing the timeseries data. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
-
ErrorSummary–A summary of errors encountered during the retrieval process.
Source code in echo_ons/ons_sager_timeseries.py
@validate_call
def get(
self,
period: DateTimeRange,
site_names: list[str] | None,
output_type: Literal["DataFrame", "pl.DataFrame"] = "DataFrame",
) -> tuple[pd.DataFrame | pl.DataFrame, ErrorSummary]:
"""Gets the timeseries data for the specified period and sites.
The result has the following columns:
- site_name: Name of the site (object) as in the performance database
- ons_site_id: Id of the site in ONS systems
- timestamp_start
- timestamp_end
- disponibilidade
- geracaoLimitada
- geracaoReferencia
- geracaoReferenciaFinal
- geracaoVerificada
Parameters
----------
period : DateTimeRange
The time period to retrieve data for.
site_names : list[str] | None
The names of the sites to retrieve data for. This name must match the object name in the database.
If set to None, all sites in the database will be requested.
output_type : Literal["DataFrame", "pl.DataFrame"], optional
Output type of the data. Can be one of ["DataFrame", "pl.DataFrame"]
By default "DataFrame"
Returns
-------
pd.DataFrame | pl.DataFrame
DataFrame containing the timeseries data. If output_type is "pl.DataFrame", a Polars DataFrame will be returned.
ErrorSummary
A summary of errors encountered during the retrieval process.
"""
# creating subperiods
subperiods = period.split_multiple(separator=timedelta(days=1), normalize=True)
# getting the objects and attributes from the database
ons_sites = self._ons._perfdb.objects.instances.get( # noqa: SLF001
object_names=site_names,
object_types=["ons_site"],
get_attributes=True,
attribute_names=["ons_site_id"],
)
# creating error ErrorSummary
error_summary = ErrorSummary(name="OnsSagerTimeseries")
if not ons_sites:
raise ValueError(f"No sites found with the specified names: {site_names}")
# iterating sites
df_list = []
df_schema = {
"disponibilidade": pl.Float64,
"geracaoLimitada": pl.Float64,
"geracaoReferencia": pl.Float64,
"geracaoReferenciaFinal": pl.Float64,
"geracaoVerificada": pl.Float64,
"horario": pl.String,
}
for site_name, site_data in ons_sites.items():
if "ons_site_id" not in site_data or not site_data["ons_site_id"]:
logger.error(f"Site '{site_name}' does not have 'ons_site_id' attribute. It will be skipped.")
# creating error object
error_obj = ErrorObject(
name=site_name,
exceptions=[ValueError(f"Site '{site_name}' does not have 'ons_site_id' attribute.")],
)
error_summary.add_child(error_obj)
continue
# getting site_id
site_id = site_data["ons_site_id"]
# iterating subperiods
for subperiod in subperiods:
endpoint = "ConsultarEventos/obterEventoDetalhado"
payload = {
"dataOcorrencia": f"{subperiod.start:%Y-%m-%d}T03:00:00.000Z",
"idUsinaConjunto": site_id,
"ehConjunto": True,
}
# get data passing payload as query string parameters
result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)
if (
isinstance(result.json(), list)
and len(result.json()) == 1
and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
):
# if we failed to get from historical data, lets get for the open events (not yet consisted)
endpoint = "ConsistenciaDoAgente/obterEvento"
result = self._ons.sager.conn.get(endpoint=endpoint, params=payload, response_ok=None)
if (
isinstance(result.json(), list)
and len(result.json()) == 1
and result.json()[0]["message"] == "O evento não está mais disponível para realizar a operação"
):
logger.warning(f"Event not available for site '{site_name}' and subperiod '{subperiod}'")
continue
if result.status_code != 200:
logger.error(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}")
# creating error object
error_obj = ErrorObject(
name=site_name,
exceptions=[
ValueError(f"Failed to retrieve data for site '{site_name}' and subperiod '{subperiod}': {result.text}"),
],
)
error_summary.add_child(error_obj)
continue
self._handle_http_errors(result)
result = result.json()
if "content" not in result or "horasEvento" not in result["content"]:
continue
power_data = result["content"]["horasEvento"]
df = pl.from_dicts(
power_data,
schema=df_schema,
)
# horário is like 00:00 - 00:29, split it into timestamp_start and timestamp_end
df = df.with_columns(
[
(
pl.col("horario")
.str.split(" - ")
.list.first()
.str.strptime(pl.Datetime, "%H:%M")
.dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day)
).alias("timestamp_start"),
(
pl.col("horario")
.str.split(" - ")
.list.last()
.str.strptime(pl.Datetime, "%H:%M")
.dt.replace(year=subperiod.start.year, month=subperiod.start.month, day=subperiod.start.day, second=59)
).alias("timestamp_end"),
],
)
# dropping unnecessary columns
df = df.drop(["horario"])
# adding the site_name and ons_site_id as columns
df = df.with_columns(
[
pl.lit(site_name).alias("site_name"),
pl.lit(site_id).alias("ons_site_id"),
],
)
df_list.append(
df,
)
# if df_list is empty, add an empty DataFrame
if not df_list:
df_list.append(pl.DataFrame(schema=df_schema | {"site_name": pl.Utf8, "ons_site_id": pl.Int64}))
# concatenating all DataFrames
df = pl.concat(df_list)
# sorting by site_name and timestamp_start
df = df.sort(["site_name", "timestamp_start"])
if output_type == "pl.DataFrame":
return df, error_summary
return df.to_pandas(use_pyarrow_extension_array=True), error_summary
import_database(period, site_names=None)
¶
Imports the limitations time series to the performance database
Parameters:
-
(period¶DateTimeRange) –The period for which the data will be retrieved.
-
(site_names¶list[str] | None, default:None) –The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.
By default None.
Returns:
-
ErrorSummary–A summary of errors encountered during the import process.
Source code in echo_ons/ons_sager_timeseries.py
@validate_call
def import_database(self, period: DateTimeRange, site_names: list[str] | None = None) -> ErrorSummary:
"""Imports the limitations time series to the performance database
Parameters
----------
period : DateTimeRange
The period for which the data will be retrieved.
site_names : list[str] | None, optional
The names of the sites to import data for. If None, all sites attached to "ons_sager" data source will be imported.
By default None.
Returns
-------
ErrorSummary
A summary of errors encountered during the import process.
"""
# checking if all the requested objects exist in self._ons.sager.data_source_objects
if site_names:
wrong_objs = set(site_names) - set(self._ons.sager.data_source_objects)
if wrong_objs:
raise ValueError(f"Requested site names not connected to data source ons_sager: {wrong_objs}")
else:
site_names = self._ons.sager.data_source_objects
# getting the data
df, error_summary = self.get(period=period, site_names=site_names, output_type="pl.DataFrame")
# adding type hint
df: pl.DataFrame
if len(df) == 0:
logger.warning("No data found for the specified period and sites.")
return error_summary
# getting all the features that are associated with this data source type
features = self._ons._perfdb.features.definitions.get(object_models=["ons_site"], data_source_types=["ons_api"]) # noqa: SLF001
# checking if all the wanted features exist in the DataFrame
wanted_names = set(features["name_in_data_source"].dropna().to_list())
missing_names = wanted_names - set(df.columns)
if missing_names:
logger.warning(f"Could not find the following names in the DataFrame returned from ONS: {missing_names}")
# adding to error_summary
for site in site_names:
error_obj = ErrorObject(
name=site,
exceptions=[ValueError(f"Could not find the following names in the DataFrame returned from ONS: {missing_names}")],
)
error_summary.add_child(error_obj)
# filtering only the wanted columns
wanted_cols = ["timestamp_start", "site_name", *list(wanted_names)]
df = df[wanted_cols]
# creating dict to rename columns
rename_dict = {
"timestamp_start": "timestamp",
"site_name": "object_name",
}
feature_rename = features.reset_index()[["name", "name_in_data_source"]].set_index("name_in_data_source")["name"].to_dict()
rename_dict |= feature_rename
df = df.rename(rename_dict)
# converting to pandas
df = df.to_pandas(use_pyarrow_extension_array=True)
# pivoting
df = df.pivot(columns="object_name", index=["timestamp"])
# swap column levels
df = df.swaplevel(axis=1)
# rename column levels
df.columns.names = ["object_name", "feature_name"]
# converting index to datetime index
df.index = pd.to_datetime(df.index)
# uploading
self._ons._perfdb.features.values.series.insert(df) # noqa: SLF001
return error_summary