SAGER¶
sager
¶
Module used to import data from ONS SAGER system.
CustomHttpAdapter(ssl_context=None, **kwargs)
¶
Class used to create a custom HTTPAdapter to use with the ONS system. It is used to avoid the SSL error that happens when trying to connect to the ONS system.
Source code in echo_ons/sager.py
def __init__(self, ssl_context=None, **kwargs) -> None: # noqa
self.ssl_context = ssl_context
super().__init__(**kwargs)
calc_limitation_events_lost_energy(period, objects=None)
¶
Method used to calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.
There are two calculations done here:
-
Using ONS methodology (based on their data)
- This depends on the data being imported to the database using the sager_importer method and the nominal power of each SPE and ONS site as defined in the database.
- Calculation will be done using features "active_power_verified" and "active_power_reference_final_calc".
-
Using our methodology (based on our data)
- This depends on data from wind turbines (aggregated to SPE level) and power meters.
- Calculation will be done using feature "active_power" from power meter and "active_power_reference" from SPE (which is calculated using the SPE features "active_power_theoretical" and "stopped_turbines_percent").
It's important to note that the calculations use numba for speed.
Parameters:
-
period
¶DateTimeRange
) –Period to calculate the produced and lost energy during each limitation event.
-
objects
¶list[str]
, default:None
) –List of the desired ONS sites. If set to [] will do all. By default []
Returns:
-
list[str]
–List of ONS sites that had errors during the calculations.
Source code in echo_ons/sager.py
def calc_limitation_events_lost_energy(period: DateTimeRange, objects: list[str] | None = None) -> list[str]:
"""Method used to calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.
There are two calculations done here:
1. Using ONS methodology (based on their data)
- This depends on the data being imported to the database using the sager_importer method and the nominal power of each SPE and ONS site as defined in the database.
- Calculation will be done using features "active_power_verified" and "active_power_reference_final_calc".
2. Using our methodology (based on our data)
- This depends on data from wind turbines (aggregated to SPE level) and power meters.
- Calculation will be done using feature "active_power" from power meter and "active_power_reference" from SPE (which is calculated using the SPE features "active_power_theoretical" and "stopped_turbines_percent").
It's important to note that the calculations use numba for speed.
Parameters
----------
period : DateTimeRange
Period to calculate the produced and lost energy during each limitation event.
objects : list[str], optional
List of the desired ONS sites. If set to [] will do all. By default []
Returns
-------
list[str]
List of ONS sites that had errors during the calculations.
"""
if objects is None:
objects = []
@njit # numba decorator to speed up the calculations
def get_limitation_energies(
limitations: np.ndarray,
timeseries: np.ndarray,
timeseries_values: np.ndarray,
timestamp_length: np.timedelta64,
limitation_time: np.ndarray,
limitation_energy: np.ndarray,
zero_timedelta: np.timedelta64 = np.timedelta64(0, "ms"), # noqa
) -> np.ndarray:
"""Method to get how much energy was lost and produced during each limitation.
As this heavily uses for loops, it will use numba to speed up the calculations. As a result of that, we need to use numpy arrays instead of pandas DataFrames and carefully choose the data types of the arrays, including the precision of the np.datetime64 and np.timedelta64 objects.
Parameters
----------
limitations : np.ndarray
Array of shape (n, 2) that represents "start" and "end". n represents the number of limitations.
For this to work with numba, the array must be of type np.datetime64 with the same precision as all other inputs (usually ms).
timeseries : np.ndarray
Array of shape (t) that represents "timestamp". m represents the number of timestamps.
For this to work with numba, the array must be of type np.datetime64 with the same precision as all other inputs (usually ms).
timeseries_values : np.ndarray
Array of shape (t, 2) that represents "actual" and "lost". m represents the number of timestamps.
For this to work with numba, the array must be of type np.float64.
timestamp_length : np.timedelta64
The length of each timestamp.
To work with numba this must be a np.timedelta64 object with same precision as all other inputs (usually ms).
limitation_time : np.ndarray
Array of shape (t) that represents how much time each timestamp has a limitation active.
It ideally would be created inside the function, but numba does not support np.timedelta64(0, precision) inside the compiled function.
It must be initialized with np.full((t,), fill_value=np.timedelta64(0), dtype="timedelta64[ms]").
limitation_energy : np.ndarray
Array of shape (n, 2) that represents "produced_energy", "lost_energy". n represents the number of limitations.
It ideally would be created inside the function, but numba did not support np.full((n, 2), fill_value=0.0, dtype="float64") inside the compiled function.
It must be initialized with np.full((n, 2), fill_value=0.0, dtype="float64").
zero_timedelta : np.timedelta64, optional
A np.timedelta64 object with value 0. To work with numba this must be a np.timedelta64 object with same precision as all other inputs (usually ms).
Ideally this would be created inside the function, but numba does not support np.timedelta64(0, precision) inside the compiled function.
By default np.timedelta64(0, "ms").
Returns
-------
np.ndarray
Array of shape (n, 2) that represents "produced_energy", "lost_energy". n represents the number of limitations.
"""
# filling NaNs with 0
timeseries_values = np.nan_to_num(timeseries_values, nan=0.0)
# getting amount of time in each timestamp that a limitation is active
for n in range(limitations.shape[0]):
for t in range(timeseries.shape[0]):
# checking if the limitation ends before the timestamp to break the loop
if timeseries[t] > limitations[n, 1]:
break
time_in_this_timestamp = get_time_in_timestamp(limitations[n, 0], limitations[n, 1], timeseries[t], timestamp_length)
limitation_time[t] += time_in_this_timestamp
for n in range(limitations.shape[0]):
lost_energy = 0.0
produced_energy = 0.0
for t in range(timeseries.shape[0]):
# checking if the limitation ends before the timestamp to break the loop
if timeseries[t] > limitations[n, 1]:
break
# getting the amount of time that the limitation is active in the timestamp
limitation_time_in_timestamp = get_time_in_timestamp(limitations[n, 0], limitations[n, 1], timeseries[t], timestamp_length)
# skipping if the limitation is not active in the timestamp
if limitation_time_in_timestamp == zero_timedelta:
continue
# getting the amount of energy produced and lost in the timestamp
produced_energy += timeseries_values[t, 0] * limitation_time_in_timestamp / limitation_time[t]
lost_energy += timeseries_values[t, 1] * limitation_time_in_timestamp / limitation_time[t]
limitation_energy[n, 0] = produced_energy
limitation_energy[n, 1] = lost_energy
return limitation_energy
@njit # numba decorator to speed up the calculations
def get_time_in_timestamp(
start: np.datetime64,
end: np.datetime64,
timestamp: np.datetime64,
timestamp_length: np.timedelta64,
zero_timedelta: np.timedelta64 = np.timedelta64(0, "ms"), # noqa
) -> np.timedelta64:
"""Method to get how much time a limitation is active in a timestamp.
Here we consider that the timestamps are closed on the left and open on the right.
Parameters
----------
start : np.datetime64
Start of the limitation. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
end : np.datetime64
End of the limitation. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
timestamp : np.datetime64
Start of the timestamp. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
timestamp_length : np.timedelta64
How long each timestamp lasts. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
zero_timedelta : np.timedelta64, optional
A np.timedelta64 object with value 0. To work with numba this must be a np.timedelta64 object with same precision as all other inputs (usually ms).
Ideally this would be created inside the function, but numba does not support np.timedelta64(0, precision) inside the compiled function.
By default np.timedelta64(0, "ms").
Returns
-------
np.timedelta64
how much time the limitation is active in the timestamp
"""
# checking if the limitation ends before the timestamp
if end < timestamp:
return zero_timedelta
# checking if the limitation starts after the timestamp
if start >= timestamp + timestamp_length:
return zero_timedelta
# checking if the limitation starts before the timestamp and ends after the timestamp
if start < timestamp and end >= timestamp + timestamp_length:
return timestamp_length
# checking if the limitation starts and ends inside the timestamp
if start >= timestamp and end < timestamp + timestamp_length:
return end - start
# checking if the limitation starts before the timestamp and ends inside the timestamp
if start < timestamp and end < timestamp + timestamp_length:
return end - timestamp
# checking if the limitation starts inside the timestamp and ends after the timestamp
if start >= timestamp and end > timestamp + timestamp_length:
return timestamp + timestamp_length - start
# if none of the above, something is wrong
raise ValueError(
"Something is wrong with the timestamps. start="
+ str(start)
+ " end="
+ str(end)
+ " timestamp="
+ str(timestamp)
+ " timestamp_length="
+ str(timestamp_length),
)
# list of objects with errors
error_objects = []
# getting objects information
perfdb = PerfDB(application_name="calc_limitation_events_lost_energy")
baze = Baze()
ons_objs = perfdb.objects.instances.get(object_types=["ons_site"], output_type="DataFrame", get_attributes=True)
spe_objs = perfdb.objects.instances.get(object_models=["wind_farm", "solar_farm"], output_type="DataFrame", get_attributes=True)
if not objects:
objects = ons_objs.index.to_list()
# checking if any of the wanted objects are not defined in the database
missing_objects = list(set(objects) - set(ons_objs.index.to_list()))
if missing_objects:
raise ValueError(f"The following objects are not defined in the database: {missing_objects}")
# iterating each object
for ons_obj in objects:
try:
# checking if the wanted object has it's associated SPEs defined in the database
if "ons_spes" not in ons_objs.columns or not ons_objs.loc[ons_obj, "ons_spes"]:
logger.error(f"{ons_obj} - No SPEs defined in the database. Please specify them in the ons_spes attribute.")
error_objects.append(ons_obj)
continue
# getting the SPEs
ons_spes = ons_objs.loc[ons_obj, "ons_spes"]
# checking if all SPEs have nominal_power defined
if "nominal_power" not in spe_objs.columns or not spe_objs.loc[ons_spes, "nominal_power"].all():
raise ValueError(f"{ons_obj} - Not all SPEs {ons_spes} have nominal_power defined in the database.")
# getting a dict with the nominal power of each SPE
spe_nominal_power = spe_objs.loc[ons_spes, "nominal_power"].to_dict()
# checking if the ONS site has nominal_power defined
if "nominal_power" not in ons_objs.columns or not ons_objs.loc[ons_obj, "nominal_power"]:
raise ValueError(f"{ons_obj} - No nominal_power defined in the database.")
# getting nominal power for the ONS site
ons_site_nominal_power = ons_objs.loc[ons_obj, "nominal_power"]
# getting all the limitations for the period
limitations = perfdb.ons.limitations.get(period=period, object_names=[ons_obj], group_type="site", realtime=True).reset_index()
# checking if there are any limitations
if limitations.empty:
logger.info(f"{ons_obj} - No limitations found for the period {period}")
continue
# reducing the required period based on the limitations
# we are adding 30 minutes to the start and end of the period as ONS data is in 30 minute intervals, this way we make sure no data is lost
this_period = DateTimeRange(
limitations["start"].min() - timedelta(minutes=30),
limitations["end"].max() + timedelta(minutes=30),
)
# creating a DataFrame with limitations for the SPE
spe_limitations = pd.DataFrame()
for spe in ons_spes:
spe_df = limitations[["object_name", "start"]].copy()
spe_df = spe_df.rename(columns={"object_name": "ons_site_name"}, level=0)
spe_df["spe_name"] = spe
for col in ["lost_energy", "produced_energy", "lost_energy_ons", "produced_energy_ons"]:
spe_df[col] = pd.NA
spe_limitations = pd.concat([spe_limitations, spe_df], axis=0)
spe_limitations = spe_limitations.set_index(["ons_site_name", "spe_name", "start"])
# * first calculating lost energy using ONS methodology (based on their data)
# getting the needed features
df = baze.points.values.series.get(
points={ons_obj: ["ActivePowerVerified_30min.AVG", "ActivePowerReferenceFinalCalc_30min.AVG"]},
period=this_period,
)
# dropping NaNs
df = df.dropna(how="all")
# renaming for easier use
df = df.rename(
columns={"ActivePowerVerified_30min.AVG": "actual", "ActivePowerReferenceFinalCalc_30min.AVG": "reference"},
level=1,
)
# converting to energy for easier use
df = df * 30 / 60
# calculating lost energy
df[(ons_obj, "lost")] = (df[(ons_obj, "reference")] - df[(ons_obj, "actual")]).clip(0, None)
# creating new columns for each SPE
for spe in ons_spes:
spe_df = df.loc[:, pd.IndexSlice[ons_obj, :]].copy()
# renaming first level to match the SPE name
spe_df = spe_df.rename(columns={ons_obj: spe}, level=0)
# adjusting values based on nominal power
spe_df = spe_df / ons_site_nominal_power * spe_nominal_power[spe]
# adding to the main DataFrame
df = pd.concat([df, spe_df], axis=1)
# getting the amount of energy produced and lost during each limitation
limitation_energy = get_limitation_energies(
limitations[["start", "end"]].to_numpy().astype("datetime64[ms]"),
df.index.to_numpy().astype("datetime64[ms]"),
df.loc[:, pd.IndexSlice[spe, ["actual", "lost"]]].astype("float64").to_numpy(),
np.timedelta64(30 * 60 * 1000, "ms"),
np.full((df.shape[0],), fill_value=np.timedelta64(0), dtype="timedelta64[ms]"),
np.full((limitations.shape[0], 2), fill_value=0.0, dtype="float64"),
)
# adding the calculated values to the DataFrame
spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "produced_energy_ons"] = limitation_energy[:, 0]
spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "lost_energy_ons"] = limitation_energy[:, 1]
# * then calculating lost energy using our methodology (based on the SPEs data)
# getting the needed features
spes_df = baze.points.values.series.get(
points={spe: ["ActivePowerReference_10min.AVG"] for spe in ons_spes},
period=this_period,
aggregation="Raw",
)
meters_df = baze.points.values.series.get(
points={f"{spe}-SMF1": ["ActivePower_5min.AVG"] for spe in ons_spes},
period=this_period,
aggregation="Raw",
)
# resample meters_df to 10 min (it was 5 min)
meters_df = meters_df.resample("10min", closed="right", label="right", origin="start_day").mean()
# merging the DataFrames
df = spes_df.merge(meters_df, left_index=True, right_index=True, how="outer")
# renaming for easier use
df = df.rename(columns={"ActivePower_5min.AVG": "actual", "ActivePowerReference_10min.AVG": "reference"}, level=1)
df = df.rename(columns={f"{spe}-SMF1": spe for spe in ons_spes}, level=0)
# shifting the index 1 timestamp to the left as the data from spe and power meter represents the end of the period
df.index = df.index - timedelta(minutes=10)
# dropping NaNs
df = df.dropna(how="all")
# converting to MWh from kW
df = df / 1000.0 / 6
# iterating each SPE
for spe in ons_spes:
spe_df = df.xs(spe, axis=1, level=0).copy()
# dropping NaNs
spe_df = spe_df.dropna(how="any")
# calculating lost energy
spe_df["lost"] = (spe_df["reference"] - spe_df["actual"]).clip(0, None)
# getting the amount of energy produced and lost during each limitation
limitation_energy = get_limitation_energies(
limitations[["start", "end"]].to_numpy().astype("datetime64[ms]"),
spe_df.index.to_numpy().astype("datetime64[ms]"),
spe_df.loc[:, ["actual", "lost"]].astype("float64").to_numpy(),
np.timedelta64(10 * 60 * 1000, "ms"),
np.full((spe_df.shape[0],), fill_value=np.timedelta64(0), dtype="timedelta64[ms]"),
np.full((limitations.shape[0], 2), fill_value=0.0, dtype="float64"),
)
# adding the calculated values to the DataFrame
spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "produced_energy"] = limitation_energy[:, 0]
spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "lost_energy"] = limitation_energy[:, 1]
# * final adjustments and saving to database
# checking if there are any NaNs
na_count = spe_limitations.isna().sum().sum()
if na_count > 0:
logger.warning(
f"{ons_obj} - There are {na_count} NaNs in the calculated values (there should be none). They will be filled with 0.0 to allow for insertion on the database.",
)
# filling NaNs with 0
spe_limitations = spe_limitations.fillna(0.0)
# saving to database
perfdb.ons.limitations.insert(df=spe_limitations.reset_index(drop=False), group_type="spe", on_conflict="update")
except Exception:
logger.exception(f"{ons_obj} Could not calculate lost energy!")
error_objects.append(ons_obj)
continue
return error_objects
login_ons(username, password)
¶
Function that login on ONS system and returns a session object.
This uses Selenium to login on the ONS system and get the necessary token to use the API. We tried using requests directly but failed so we had to use Selenium, which is slower but works.
Parameters:
-
username
¶str
) –Username to login on ONS system.
-
password
¶str
) –Password to login on ONS system.
Returns:
-
Session
–Session object with login information.
Source code in echo_ons/sager.py
def login_ons(username: str, password: str) -> requests.Session:
"""Function that login on ONS system and returns a session object.
This uses Selenium to login on the ONS system and get the necessary token to use the API. We tried using requests directly but failed so we had to use Selenium, which is slower but works.
Parameters
----------
username : str
Username to login on ONS system.
password : str
Password to login on ONS system.
Returns
-------
requests.Session
Session object with login information.
"""
# Step 1: Start a Selenium WebDriver session
chrome_options = Options()
chrome_options.add_argument("--headless") # Headless mode
chrome_options.add_argument("--no-sandbox") # Bypass OS security model
chrome_options.add_argument("--disable-dev-shm-usage") # Overcome limited resource problems
chrome_options.add_argument("--disable-gpu") # Disable GPU hardware acceleration
chrome_options.add_argument("--window-size=1920x1080") # Set window size
chrome_options.add_argument(
"user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
)
# try connection without installing first
try:
driver = webdriver.Chrome(options=chrome_options)
except Exception:
service = Service(ChromeDriverManager().install())
driver = webdriver.Chrome(service=service, options=chrome_options)
driver.set_window_size(1920, 1080) # Example size, adjust as needed
driver.get("https://pops.ons.org.br/ons.pop.federation")
# Step 2: Locate and fill in the login form
username_field = driver.find_element(By.NAME, "username")
password_field = driver.find_element(By.NAME, "password")
# Enter your credentials
username_field.send_keys(username)
password_field.send_keys(password)
# Take a screenshot (uncomment for debugging)
# driver.save_screenshot("screenshot.png") # noqa: ERA001
password_field.send_keys(Keys.RETURN) # Submit the form
# Step 3: Wait for the login to complete
logger.info("Accessing SAGER")
sleep(5)
# a high timeout is set here as this pages takes a long time to load
# check every 1 second if viewer iframe is available and if so switch to it
not_found_viewer = True
amount = 0
while not_found_viewer and amount < 60:
# find the viewer iframe (not using WebDriverWait here because it was not working)
try:
driver.switch_to.frame("viewer")
not_found_viewer = False
sleep(5)
except Exception:
logger.warning("Could not find the viewer iframe. Trying to find it again.")
sleep(1)
# Take a screenshot (uncomment for debugging)
# driver.save_screenshot("screenshot2.png") # noqa: ERA001
logger.info("Found the viewer iframe and switched to it")
# find SAGER button based on title SAGER
sager_button = driver.find_element("css selector", 'a[title="SAGER"]')
sager_button.click()
logger.info("Entered in SAGER page")
# Take a screenshot (uncomment for debugging)
# driver.save_screenshot("screenshot3.png") # noqa: ERA001
# getting session from selenium driver and transporting it to requests
session = _get_legacy_session()
for cookie in driver.get_cookies():
session.cookies.set(cookie["name"], cookie["value"])
# quit the driver
driver.quit()
# getting token
a = session.post(
"https://pops.ons.org.br/ons.pop.federation/oauth2/token?grant_type=password&client_id=SAGER",
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"charset": "utf-8",
"Origin": "https://apps18.ons.org.br",
"Referer": "https://apps18.ons.org.br",
},
data={"grant_type": "password", "client_id": "SAGER"},
)
session.headers.update({"Authorization": f"Bearer {a.json()['access_token']}"})
return session
ons_status_importer(objects=None, period=None, subperiod_length=None, overlap_interval=None)
¶
Wrapper function to validate parameters, fetch, process and save ONS Events data.
Parameters:
-
objects
¶list[str | int] | None
, default:None
) –The desired ONS site objects. It can be either the name of the ONS object or their id number at performance_db. If not used the function will try to fetch all data from all objects. By default None
-
period
¶DateTimeRange
, default:None
) –The desired period to fetch. If not used the function will use the period between now and the last status registered. By default None
-
subperiod_length
¶timedelta | None
, default:None
) –If set, the period will be split into subperiods of this length. This is useful to avoid timeouts when fetching data from the ONS API. By default will be 7 days.
-
overlap_interval
¶timedelta | None
, default:None
) –If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.
If None, the start of the period will not be changed.
By default None.
Returns:
-
list[str]
–List of objects that had errors during the process
Source code in echo_ons/sager.py
def ons_status_importer(
objects: list[str | int] | None = None,
period: DateTimeRange = None,
subperiod_length: timedelta | None = None,
overlap_interval: timedelta | None = None,
) -> list[str]:
"""Wrapper function to validate parameters, fetch, process and save ONS Events data.
Parameters
----------
objects : list[str | int] | None, optional
The desired ONS site objects. It can be either the name of the ONS object or their id number at performance_db. If not used the function will try to fetch all data from all objects. By default None
period : DateTimeRange, optional
The desired period to fetch. If not used the function will use the period between now and the last status registered. By default None
subperiod_length : timedelta | None, optional
If set, the period will be split into subperiods of this length. This is useful to avoid timeouts when fetching data from the ONS API.
By default will be 7 days.
overlap_interval : timedelta | None, optional
If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.
If None, the start of the period will not be changed.
By default None.
Returns
-------
list[str]
List of objects that had errors during the process
"""
perfdb = PerfDB(application_name="ons_event_status_importer")
with perfdb.conn.reconnect() as conn:
# creating period
if not period:
start_date = pd.read_sql_query("SELECT date FROM ons_data_validation ORDER BY date DESC LIMIT 1", conn).iloc[0]["date"]
period = DateTimeRange(start=start_date, end=datetime.now())
if overlap_interval:
period.start -= overlap_interval
# attributes to connect to the ONS API
data_source = perfdb.datasources.instances.get(data_source_names=["ons_sager"], output_type="dict", get_attributes=True)[
"ons_sager"
]
conn_objs = data_source["object_names"]
# getting objects definitions
conn_objs = perfdb.objects.instances.get(object_names=conn_objs, output_type="DataFrame", get_attributes=True)
# simple object validation
error_objects = []
if not objects:
objects = conn_objs["ons_site_id"].to_list()
elif all(isinstance(x, str) for x in objects):
error_objects.extend(ob for ob in objects if ob not in conn_objs.index.to_list())
objects = conn_objs["ons_site_id"].to_list()
elif all(isinstance(x, int) for x in objects):
error_objects.extend(ob for ob in objects if ob not in conn_objs["id"].to_list())
if error_objects:
msg = f"The objects wit ons ids {', '.join(str(error_objects))} do not exist on performance_db"
logger.critical(msg)
return error_objects
logger.info("Object validation: OK")
# splitting into subperiods
if subperiod_length is None:
subperiod_length = timedelta(days=7)
subperiods = period.split_multiple(separator=subperiod_length, normalize=True)
session = None
for subperiod in subperiods:
subperiod_attemps = 0
subperiod_succes = False
while not subperiod_succes:
try:
logger.info(f"Getting data for {subperiod}")
if session is None:
succes = False
try_count = 0
while not succes:
try:
session = login_ons(data_source["user"], data_source["password"])
succes = True
except Exception as ex:
logger.exception("Failed to connect to ONS")
session = None
try_count += 1
if try_count > 3:
raise ConnectionError(f"Failed to connect to ONS after {try_count} attempts") from ex
logger.info("Connected to ONS!")
events = _get_all_status(session, objects, subperiod)
events = _process_status_data(events)
_save_status_data(events, conn_objs)
subperiod_succes = True
except Exception:
logger.exception(f"Failed to get data for {subperiod}")
subperiod_attemps += 1
if subperiod_attemps > 3:
logger.error(f"Failed to get data for {subperiod} after {subperiod_attemps} attempts. skipping to the next subperiod.")
break
return []
sager_importer(period, objects=None, recalc_features=False, calc_limitation_lost_energy=True, overlap_interval=None)
¶
Function that gathers limitation and time series data from the ONS SAGER system and saves it to our database.
If requested, it will also recalculate the features of the imported objects.
Parameters:
-
period
¶dict[str, datetime | None] | DateTimeRange
) –The period to fetch data from.
If a dict is passed, it must contain the keys "start" and "end". They can be a datetime object or None. If None, they will be dynamically set using existing data from the database.
If a DateTimeRange object is passed, the start and end datetimes will be used.
-
objects
¶list[str] | None
, default:None
) –List with the name of the objects as in the Performance Database.
These objects must be connected to data source "ons_sager".
If [], all objects connected to the data source will be imported, by default [].
-
recalc_features
¶bool
, default:False
) –If True, the function will recalculate the features of the ONS objects, by default False.
-
calc_limitation_lost_energy
¶bool
, default:True
) –If True, the function will calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.
By default True.
-
overlap_interval
¶timedelta | None
, default:None
) –If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.
If None, the start of the period will not be changed.
By default None.
Returns:
-
dict[str, DateTimeRange]
–Dict with the imported objects and the periods that were imported. It will be in the format {object_name: DateTimeRange, ...}.
-
list[str]
–List with the names of the objects that failed to import.
Source code in echo_ons/sager.py
def sager_importer(
period: dict[str, datetime | None] | DateTimeRange,
objects: list[str] | None = None,
recalc_features: bool = False,
calc_limitation_lost_energy: bool = True,
overlap_interval: timedelta | None = None,
) -> tuple[dict[str, DateTimeRange], list[str]]:
# sourcery skip: low-code-quality
"""Function that gathers limitation and time series data from the ONS SAGER system and saves it to our database.
If requested, it will also recalculate the features of the imported objects.
Parameters
----------
period : dict[str, datetime | None] | DateTimeRange
The period to fetch data from.
If a dict is passed, it must contain the keys "start" and "end". They can be a datetime object or None. If None, they will be dynamically set using existing data from the database.
If a DateTimeRange object is passed, the start and end datetimes will be used.
objects : list[str] | None, optional
List with the name of the objects as in the Performance Database.
These objects must be connected to data source "ons_sager".
If [], all objects connected to the data source will be imported, by default [].
recalc_features : bool, optional
If True, the function will recalculate the features of the ONS objects, by default False.
calc_limitation_lost_energy : bool, optional
If True, the function will calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.
By default True.
overlap_interval : timedelta | None, optional
If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.
If None, the start of the period will not be changed.
By default None.
Returns
-------
dict[str, DateTimeRange]
Dict with the imported objects and the periods that were imported. It will be in the format {object_name: DateTimeRange, ...}.
list[str]
List with the names of the objects that failed to import.
"""
if objects is None:
objects = []
# dict with the imported objects and it's day_periods
imported_objects = {}
# list of objects with errors
error_objects = []
# getting data source information
perfdb = PerfDB(application_name="sager_importer")
data_source = perfdb.datasources.instances.get(data_source_names=["ons_sager"], output_type="dict", get_attributes=True)["ons_sager"]
conn_objs = data_source["object_names"]
# getting objects definitions
conn_objs = perfdb.objects.instances.get(object_names=conn_objs, output_type="DataFrame", get_attributes=True)
if not objects:
objects = conn_objs.index.to_list()
if missing_objects := list(set(objects) - set(conn_objs.index.to_list())):
logger.error(f"The following objects are not connected to the data source: {missing_objects}")
# removing missing objects from the list
objects = list(set(objects) - set(missing_objects))
# adding to the error_objects list
error_objects += missing_objects
def connect_to_ons(data_source: dict, max_try: int = 3) -> requests.Session:
"""Function to connect to the ONS API.
Parameters
----------
data_source : dict
Dictionary containing the user and password to connect to the ONS API.
max_try : int, optional
Maximum number of attempts to connect to the ONS API. By default 3.
Returns
-------
requests.Session
Session object with login information.
"""
succes = False
try_count = 0
while not succes:
try:
session = login_ons(data_source["user"], data_source["password"])
succes = True
except Exception as ex:
logger.exception("Failed to connect to ONS")
try_count += 1
if try_count > max_try:
raise ConnectionError(f"Failed to connect to ONS after {try_count} attempts") from ex
return session
session = connect_to_ons(data_source)
# iterating each object
for obj in objects:
try:
# getting the period to fetch data from
this_period = period.copy()
if isinstance(period, dict):
if this_period["end"] is None:
this_period["end"] = datetime.now()
if this_period["start"] is None:
# getting latest data from the database
query = f"SELECT start FROM v_ons_limitations_realtime WHERE object_name = '{obj}' ORDER BY start DESC LIMIT 1" # noqa
with perfdb.conn.reconnect() as conn:
last_start = conn.read_to_pandas(query)
if last_start.empty:
this_period["start"] = datetime(2020, 1, 1)
else:
this_period["start"] = last_start.iloc[0, 0]
# converting to DateTimeRange
this_period = DateTimeRange.from_dict(this_period)
# adjusting start of the period if requested
if overlap_interval is not None:
if not isinstance(overlap_interval, timedelta):
raise TypeError(f"overlap_interval must be a timedelta object, got {type(overlap_interval)}")
this_period.start -= overlap_interval
imported_objects[obj] = this_period
logger.info(f"{obj} - Getting data from {this_period}")
# splitting the period into days
day_periods = this_period.split_multiple(separator=timedelta(days=1), normalize=True)
# iterating each day
for day_period in day_periods:
try:
success = False
retry = 0
while not success and retry < 2:
try:
# getting the day
day = day_period.start
# getting the data
data = _get_data(session, day, conn_objs.loc[obj, "ons_site_id"])
success = True
# checking if the data is empty
if data is None:
continue
# processing the data
processed_data, ons_generation_data = _process_data(data, day, obj)
# saving the data
_save_data(processed_data, obj, ons_generation_data)
except Exception:
# reconnecting to ONS
logger.exception(f"{obj} - Could not get data for the day {day}. Will try to connect again.")
session = connect_to_ons(data_source)
retry += 1
if not success:
raise ConnectionError("Could not get data from ONS")
except Exception:
logger.exception(f"{obj} - Could not get data for the day {day}")
error_objects.append(obj)
continue
except Exception:
logger.exception(f"{obj} Could not get data!")
error_objects.append(obj)
continue
session.close()
# recalculating features
if recalc_features:
from echo_energycalc import (
CalculationHandler, # pylint: disable=import-outside-toplevel
)
logger.info("Recalculating features")
for obj, obj_period in imported_objects.items():
logger.info(f"{obj} - Recalculating features for period {obj_period}")
try:
calc = CalculationHandler.from_type_defaults(object_filters={"object_names": [obj]})[0]
errs = calc.calculate(period=obj_period)
if errs.total_exception_count > 0:
raise RuntimeError(f"Errors occurred while recalculating features for {obj} and period {obj_period}")
except Exception:
logger.exception(f"{obj} - Could not recalculate features")
error_objects.append(obj)
continue
if calc_limitation_lost_energy:
logger.info("Calculating limitation events lost energy")
for obj, obj_period in imported_objects.items():
logger.info(f"{obj} - Calculating limitation events lost energy for period {obj_period}")
try:
if errs := calc_limitation_events_lost_energy(
obj_period,
objects=[obj],
):
raise RuntimeError(f"Errors occurred while calculating limitation events lost energy for {obj} and period {obj_period}")
except Exception:
logger.exception(f"{obj} - Could not calculate limitation events lost energy")
error_objects.append(obj)
continue
error_objects = sorted(set(error_objects))
return imported_objects, error_objects