Skip to content

SAGER

sager

Module used to import data from ONS SAGER system.

CustomHttpAdapter(ssl_context=None, **kwargs)

Class used to create a custom HTTPAdapter to use with the ONS system. It is used to avoid the SSL error that happens when trying to connect to the ONS system.

Source code in echo_ons/sager.py
def __init__(self, ssl_context=None, **kwargs) -> None:  # noqa
    self.ssl_context = ssl_context
    super().__init__(**kwargs)

calc_limitation_events_lost_energy(period, objects=None)

Method used to calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.

There are two calculations done here:

  1. Using ONS methodology (based on their data)

    • This depends on the data being imported to the database using the sager_importer method and the nominal power of each SPE and ONS site as defined in the database.
    • Calculation will be done using features "active_power_verified" and "active_power_reference_final_calc".
  2. Using our methodology (based on our data)

    • This depends on data from wind turbines (aggregated to SPE level) and power meters.
    • Calculation will be done using feature "active_power" from power meter and "active_power_reference" from SPE (which is calculated using the SPE features "active_power_theoretical" and "stopped_turbines_percent").

It's important to note that the calculations use numba for speed.

Parameters:

  • period

    (DateTimeRange) –

    Period to calculate the produced and lost energy during each limitation event.

  • objects

    (list[str], default: None ) –

    List of the desired ONS sites. If set to [] will do all. By default []

Returns:

  • list[str]

    List of ONS sites that had errors during the calculations.

Source code in echo_ons/sager.py
def calc_limitation_events_lost_energy(period: DateTimeRange, objects: list[str] | None = None) -> list[str]:
    """Method used to calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.

    There are two calculations done here:

    1. Using ONS methodology (based on their data)

        - This depends on the data being imported to the database using the sager_importer method and the nominal power of each SPE and ONS site as defined in the database.
        - Calculation will be done using features "active_power_verified" and "active_power_reference_final_calc".

    2. Using our methodology (based on our data)

        - This depends on data from wind turbines (aggregated to SPE level) and power meters.
        - Calculation will be done using feature "active_power" from power meter and "active_power_reference" from SPE (which is calculated using the SPE features "active_power_theoretical" and "stopped_turbines_percent").

    It's important to note that the calculations use numba for speed.

    Parameters
    ----------
    period : DateTimeRange
        Period to calculate the produced and lost energy during each limitation event.
    objects : list[str], optional
        List of the desired ONS sites. If set to [] will do all. By default []

    Returns
    -------
    list[str]
        List of ONS sites that had errors during the calculations.

    """
    if objects is None:
        objects = []

    @njit  # numba decorator to speed up the calculations
    def get_limitation_energies(
        limitations: np.ndarray,
        timeseries: np.ndarray,
        timeseries_values: np.ndarray,
        timestamp_length: np.timedelta64,
        limitation_time: np.ndarray,
        limitation_energy: np.ndarray,
        zero_timedelta: np.timedelta64 = np.timedelta64(0, "ms"),  # noqa
    ) -> np.ndarray:
        """Method to get how much energy was lost and produced during each limitation.

        As this heavily uses for loops, it will use numba to speed up the calculations. As a result of that, we need to use numpy arrays instead of pandas DataFrames and carefully choose the data types of the arrays, including the precision of the np.datetime64 and np.timedelta64 objects.

        Parameters
        ----------
        limitations : np.ndarray
            Array of shape (n, 2) that represents "start" and "end". n represents the number of limitations.

            For this to work with numba, the array must be of type np.datetime64 with the same precision as all other inputs (usually ms).
        timeseries : np.ndarray
            Array of shape (t) that represents "timestamp". m represents the number of timestamps.

            For this to work with numba, the array must be of type np.datetime64 with the same precision as all other inputs (usually ms).
        timeseries_values : np.ndarray
            Array of shape (t, 2) that represents "actual" and "lost". m represents the number of timestamps.

            For this to work with numba, the array must be of type np.float64.
        timestamp_length : np.timedelta64
            The length of each timestamp.

            To work with numba this must be a np.timedelta64 object with same precision as all other inputs (usually ms).
        limitation_time : np.ndarray
            Array of shape (t) that represents how much time each timestamp has a limitation active.

            It ideally would be created inside the function, but numba does not support np.timedelta64(0, precision) inside the compiled function.

            It must be initialized with np.full((t,), fill_value=np.timedelta64(0), dtype="timedelta64[ms]").
        limitation_energy : np.ndarray
            Array of shape (n, 2) that represents "produced_energy", "lost_energy". n represents the number of limitations.

            It ideally would be created inside the function, but numba did not support np.full((n, 2), fill_value=0.0, dtype="float64") inside the compiled function.

            It must be initialized with np.full((n, 2), fill_value=0.0, dtype="float64").
        zero_timedelta : np.timedelta64, optional
            A np.timedelta64 object with value 0. To work with numba this must be a np.timedelta64 object with same precision as all other inputs (usually ms).

            Ideally this would be created inside the function, but numba does not support np.timedelta64(0, precision) inside the compiled function.

            By default np.timedelta64(0, "ms").

        Returns
        -------
        np.ndarray
            Array of shape (n, 2) that represents "produced_energy", "lost_energy". n represents the number of limitations.

        """
        # filling NaNs with 0
        timeseries_values = np.nan_to_num(timeseries_values, nan=0.0)

        # getting amount of time in each timestamp that a limitation is active
        for n in range(limitations.shape[0]):
            for t in range(timeseries.shape[0]):
                # checking if the limitation ends before the timestamp to break the loop
                if timeseries[t] > limitations[n, 1]:
                    break

                time_in_this_timestamp = get_time_in_timestamp(limitations[n, 0], limitations[n, 1], timeseries[t], timestamp_length)
                limitation_time[t] += time_in_this_timestamp

        for n in range(limitations.shape[0]):
            lost_energy = 0.0
            produced_energy = 0.0
            for t in range(timeseries.shape[0]):
                # checking if the limitation ends before the timestamp to break the loop
                if timeseries[t] > limitations[n, 1]:
                    break

                # getting the amount of time that the limitation is active in the timestamp
                limitation_time_in_timestamp = get_time_in_timestamp(limitations[n, 0], limitations[n, 1], timeseries[t], timestamp_length)

                # skipping if the limitation is not active in the timestamp
                if limitation_time_in_timestamp == zero_timedelta:
                    continue

                # getting the amount of energy produced and lost in the timestamp
                produced_energy += timeseries_values[t, 0] * limitation_time_in_timestamp / limitation_time[t]
                lost_energy += timeseries_values[t, 1] * limitation_time_in_timestamp / limitation_time[t]

            limitation_energy[n, 0] = produced_energy
            limitation_energy[n, 1] = lost_energy

        return limitation_energy

    @njit  # numba decorator to speed up the calculations
    def get_time_in_timestamp(
        start: np.datetime64,
        end: np.datetime64,
        timestamp: np.datetime64,
        timestamp_length: np.timedelta64,
        zero_timedelta: np.timedelta64 = np.timedelta64(0, "ms"),  # noqa
    ) -> np.timedelta64:
        """Method to get how much time a limitation is active in a timestamp.

        Here we consider that the timestamps are closed on the left and open on the right.

        Parameters
        ----------
        start : np.datetime64
            Start of the limitation. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
        end : np.datetime64
            End of the limitation. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
        timestamp : np.datetime64
            Start of the timestamp. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
        timestamp_length : np.timedelta64
            How long each timestamp lasts. To work with numba this must be a np.datetime64 object with same precision as all other inputs (usually ms).
        zero_timedelta : np.timedelta64, optional
            A np.timedelta64 object with value 0. To work with numba this must be a np.timedelta64 object with same precision as all other inputs (usually ms).

            Ideally this would be created inside the function, but numba does not support np.timedelta64(0, precision) inside the compiled function.

            By default np.timedelta64(0, "ms").

        Returns
        -------
        np.timedelta64
            how much time the limitation is active in the timestamp

        """
        # checking if the limitation ends before the timestamp
        if end < timestamp:
            return zero_timedelta
        # checking if the limitation starts after the timestamp
        if start >= timestamp + timestamp_length:
            return zero_timedelta

        # checking if the limitation starts before the timestamp and ends after the timestamp
        if start < timestamp and end >= timestamp + timestamp_length:
            return timestamp_length
        # checking if the limitation starts and ends inside the timestamp
        if start >= timestamp and end < timestamp + timestamp_length:
            return end - start
        # checking if the limitation starts before the timestamp and ends inside the timestamp
        if start < timestamp and end < timestamp + timestamp_length:
            return end - timestamp
        # checking if the limitation starts inside the timestamp and ends after the timestamp
        if start >= timestamp and end > timestamp + timestamp_length:
            return timestamp + timestamp_length - start
        # if none of the above, something is wrong
        raise ValueError(
            "Something is wrong with the timestamps. start="
            + str(start)
            + " end="
            + str(end)
            + " timestamp="
            + str(timestamp)
            + " timestamp_length="
            + str(timestamp_length),
        )

    # list of objects with errors
    error_objects = []

    # getting objects information
    perfdb = PerfDB(application_name="calc_limitation_events_lost_energy")
    baze = Baze()
    ons_objs = perfdb.objects.instances.get(object_types=["ons_site"], output_type="DataFrame", get_attributes=True)
    spe_objs = perfdb.objects.instances.get(object_models=["wind_farm", "solar_farm"], output_type="DataFrame", get_attributes=True)

    if not objects:
        objects = ons_objs.index.to_list()

    # checking if any of the wanted objects are not defined in the database
    missing_objects = list(set(objects) - set(ons_objs.index.to_list()))
    if missing_objects:
        raise ValueError(f"The following objects are not defined in the database: {missing_objects}")

    # iterating each object
    for ons_obj in objects:
        try:
            # checking if the wanted object has it's associated SPEs defined in the database
            if "ons_spes" not in ons_objs.columns or not ons_objs.loc[ons_obj, "ons_spes"]:
                logger.error(f"{ons_obj} - No SPEs defined in the database. Please specify them in the ons_spes attribute.")
                error_objects.append(ons_obj)
                continue

            # getting the SPEs
            ons_spes = ons_objs.loc[ons_obj, "ons_spes"]

            # checking if all SPEs have nominal_power defined
            if "nominal_power" not in spe_objs.columns or not spe_objs.loc[ons_spes, "nominal_power"].all():
                raise ValueError(f"{ons_obj} - Not all SPEs {ons_spes} have nominal_power defined in the database.")

            # getting a dict with the nominal power of each SPE
            spe_nominal_power = spe_objs.loc[ons_spes, "nominal_power"].to_dict()

            # checking if the ONS site has nominal_power defined
            if "nominal_power" not in ons_objs.columns or not ons_objs.loc[ons_obj, "nominal_power"]:
                raise ValueError(f"{ons_obj} - No nominal_power defined in the database.")

            # getting nominal power for the ONS site
            ons_site_nominal_power = ons_objs.loc[ons_obj, "nominal_power"]

            # getting all the limitations for the period
            limitations = perfdb.ons.limitations.get(period=period, object_names=[ons_obj], group_type="site", realtime=True).reset_index()

            # checking if there are any limitations
            if limitations.empty:
                logger.info(f"{ons_obj} - No limitations found for the period {period}")
                continue

            # reducing the required period based on the limitations
            # we are adding 30 minutes to the start and end of the period as ONS data is in 30 minute intervals, this way we make sure no data is lost
            this_period = DateTimeRange(
                limitations["start"].min() - timedelta(minutes=30),
                limitations["end"].max() + timedelta(minutes=30),
            )

            # creating a DataFrame with limitations for the SPE
            spe_limitations = pd.DataFrame()
            for spe in ons_spes:
                spe_df = limitations[["object_name", "start"]].copy()
                spe_df = spe_df.rename(columns={"object_name": "ons_site_name"}, level=0)
                spe_df["spe_name"] = spe
                for col in ["lost_energy", "produced_energy", "lost_energy_ons", "produced_energy_ons"]:
                    spe_df[col] = pd.NA
                spe_limitations = pd.concat([spe_limitations, spe_df], axis=0)
            spe_limitations = spe_limitations.set_index(["ons_site_name", "spe_name", "start"])

            # * first calculating lost energy using ONS methodology (based on their data)

            # getting the needed features
            df = baze.points.values.series.get(
                points={ons_obj: ["ActivePowerVerified_30min.AVG", "ActivePowerReferenceFinalCalc_30min.AVG"]},
                period=this_period,
            )
            # dropping NaNs
            df = df.dropna(how="all")

            # renaming for easier use
            df = df.rename(
                columns={"ActivePowerVerified_30min.AVG": "actual", "ActivePowerReferenceFinalCalc_30min.AVG": "reference"},
                level=1,
            )

            # converting to energy for easier use
            df = df * 30 / 60

            # calculating lost energy
            df[(ons_obj, "lost")] = (df[(ons_obj, "reference")] - df[(ons_obj, "actual")]).clip(0, None)

            # creating new columns for each SPE
            for spe in ons_spes:
                spe_df = df.loc[:, pd.IndexSlice[ons_obj, :]].copy()
                # renaming first level to match the SPE name
                spe_df = spe_df.rename(columns={ons_obj: spe}, level=0)
                # adjusting values based on nominal power
                spe_df = spe_df / ons_site_nominal_power * spe_nominal_power[spe]
                # adding to the main DataFrame
                df = pd.concat([df, spe_df], axis=1)

                # getting the amount of energy produced and lost during each limitation
                limitation_energy = get_limitation_energies(
                    limitations[["start", "end"]].to_numpy().astype("datetime64[ms]"),
                    df.index.to_numpy().astype("datetime64[ms]"),
                    df.loc[:, pd.IndexSlice[spe, ["actual", "lost"]]].astype("float64").to_numpy(),
                    np.timedelta64(30 * 60 * 1000, "ms"),
                    np.full((df.shape[0],), fill_value=np.timedelta64(0), dtype="timedelta64[ms]"),
                    np.full((limitations.shape[0], 2), fill_value=0.0, dtype="float64"),
                )

                # adding the calculated values to the DataFrame
                spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "produced_energy_ons"] = limitation_energy[:, 0]
                spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "lost_energy_ons"] = limitation_energy[:, 1]

            # * then calculating lost energy using our methodology (based on the SPEs data)

            # getting the needed features
            spes_df = baze.points.values.series.get(
                points={spe: ["ActivePowerReference_10min.AVG"] for spe in ons_spes},
                period=this_period,
                aggregation="Raw",
            )
            meters_df = baze.points.values.series.get(
                points={f"{spe}-SMF1": ["ActivePower_5min.AVG"] for spe in ons_spes},
                period=this_period,
                aggregation="Raw",
            )

            # resample meters_df to 10 min (it was 5 min)
            meters_df = meters_df.resample("10min", closed="right", label="right", origin="start_day").mean()

            # merging the DataFrames
            df = spes_df.merge(meters_df, left_index=True, right_index=True, how="outer")

            # renaming for easier use
            df = df.rename(columns={"ActivePower_5min.AVG": "actual", "ActivePowerReference_10min.AVG": "reference"}, level=1)
            df = df.rename(columns={f"{spe}-SMF1": spe for spe in ons_spes}, level=0)

            # shifting the index 1 timestamp to the left as the data from spe and power meter represents the end of the period
            df.index = df.index - timedelta(minutes=10)

            # dropping NaNs
            df = df.dropna(how="all")

            # converting to MWh from kW
            df = df / 1000.0 / 6

            # iterating each SPE
            for spe in ons_spes:
                spe_df = df.xs(spe, axis=1, level=0).copy()
                # dropping NaNs
                spe_df = spe_df.dropna(how="any")
                # calculating lost energy
                spe_df["lost"] = (spe_df["reference"] - spe_df["actual"]).clip(0, None)

                # getting the amount of energy produced and lost during each limitation
                limitation_energy = get_limitation_energies(
                    limitations[["start", "end"]].to_numpy().astype("datetime64[ms]"),
                    spe_df.index.to_numpy().astype("datetime64[ms]"),
                    spe_df.loc[:, ["actual", "lost"]].astype("float64").to_numpy(),
                    np.timedelta64(10 * 60 * 1000, "ms"),
                    np.full((spe_df.shape[0],), fill_value=np.timedelta64(0), dtype="timedelta64[ms]"),
                    np.full((limitations.shape[0], 2), fill_value=0.0, dtype="float64"),
                )

                # adding the calculated values to the DataFrame
                spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "produced_energy"] = limitation_energy[:, 0]
                spe_limitations.loc[pd.IndexSlice[ons_obj, spe, :], "lost_energy"] = limitation_energy[:, 1]

            # * final adjustments and saving to database

            # checking if there are any NaNs
            na_count = spe_limitations.isna().sum().sum()
            if na_count > 0:
                logger.warning(
                    f"{ons_obj} - There are {na_count} NaNs in the calculated values (there should be none). They will be filled with 0.0 to allow for insertion on the database.",
                )

            # filling NaNs with 0
            spe_limitations = spe_limitations.fillna(0.0)

            # saving to database
            perfdb.ons.limitations.insert(df=spe_limitations.reset_index(drop=False), group_type="spe", on_conflict="update")

        except Exception:
            logger.exception(f"{ons_obj} Could not calculate lost energy!")
            error_objects.append(ons_obj)
            continue

    return error_objects

login_ons(username, password)

Function that login on ONS system and returns a session object.

This uses Selenium to login on the ONS system and get the necessary token to use the API. We tried using requests directly but failed so we had to use Selenium, which is slower but works.

Parameters:

  • username

    (str) –

    Username to login on ONS system.

  • password

    (str) –

    Password to login on ONS system.

Returns:

  • Session

    Session object with login information.

Source code in echo_ons/sager.py
def login_ons(username: str, password: str) -> requests.Session:
    """Function that login on ONS system and returns a session object.

    This uses Selenium to login on the ONS system and get the necessary token to use the API. We tried using requests directly but failed so we had to use Selenium, which is slower but works.

    Parameters
    ----------
    username : str
        Username to login on ONS system.
    password : str
        Password to login on ONS system.

    Returns
    -------
    requests.Session
        Session object with login information.

    """
    # Step 1: Start a Selenium WebDriver session
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # Headless mode
    chrome_options.add_argument("--no-sandbox")  # Bypass OS security model
    chrome_options.add_argument("--disable-dev-shm-usage")  # Overcome limited resource problems
    chrome_options.add_argument("--disable-gpu")  # Disable GPU hardware acceleration
    chrome_options.add_argument("--window-size=1920x1080")  # Set window size
    chrome_options.add_argument(
        "user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36",
    )

    # try connection without installing first
    try:
        driver = webdriver.Chrome(options=chrome_options)
    except Exception:
        service = Service(ChromeDriverManager().install())
        driver = webdriver.Chrome(service=service, options=chrome_options)

    driver.set_window_size(1920, 1080)  # Example size, adjust as needed
    driver.get("https://pops.ons.org.br/ons.pop.federation")

    # Step 2: Locate and fill in the login form
    username_field = driver.find_element(By.NAME, "username")
    password_field = driver.find_element(By.NAME, "password")

    # Enter your credentials
    username_field.send_keys(username)
    password_field.send_keys(password)

    # Take a screenshot (uncomment for debugging)
    # driver.save_screenshot("screenshot.png")  # noqa: ERA001

    password_field.send_keys(Keys.RETURN)  # Submit the form

    # Step 3: Wait for the login to complete

    logger.info("Accessing SAGER")

    sleep(5)

    # a high timeout is set here as this pages takes a long time to load
    # check every 1 second if viewer iframe is available and if so switch to it
    not_found_viewer = True
    amount = 0
    while not_found_viewer and amount < 60:
        # find the viewer iframe (not using WebDriverWait here because it was not working)
        try:
            driver.switch_to.frame("viewer")
            not_found_viewer = False
            sleep(5)
        except Exception:
            logger.warning("Could not find the viewer iframe. Trying to find it again.")
            sleep(1)

    # Take a screenshot (uncomment for debugging)
    # driver.save_screenshot("screenshot2.png")  # noqa: ERA001

    logger.info("Found the viewer iframe and switched to it")

    # find SAGER button based on title SAGER
    sager_button = driver.find_element("css selector", 'a[title="SAGER"]')

    sager_button.click()

    logger.info("Entered in SAGER page")

    # Take a screenshot (uncomment for debugging)
    # driver.save_screenshot("screenshot3.png")  # noqa: ERA001

    # getting session from selenium driver and transporting it to requests
    session = _get_legacy_session()
    for cookie in driver.get_cookies():
        session.cookies.set(cookie["name"], cookie["value"])

    # quit the driver
    driver.quit()

    # getting token
    a = session.post(
        "https://pops.ons.org.br/ons.pop.federation/oauth2/token?grant_type=password&client_id=SAGER",
        headers={
            "Content-Type": "application/json",
            "Accept": "application/json",
            "charset": "utf-8",
            "Origin": "https://apps18.ons.org.br",
            "Referer": "https://apps18.ons.org.br",
        },
        data={"grant_type": "password", "client_id": "SAGER"},
    )

    session.headers.update({"Authorization": f"Bearer {a.json()['access_token']}"})

    return session

ons_status_importer(objects=None, period=None, subperiod_length=None, overlap_interval=None)

Wrapper function to validate parameters, fetch, process and save ONS Events data.

Parameters:

  • objects

    (list[str | int] | None, default: None ) –

    The desired ONS site objects. It can be either the name of the ONS object or their id number at performance_db. If not used the function will try to fetch all data from all objects. By default None

  • period

    (DateTimeRange, default: None ) –

    The desired period to fetch. If not used the function will use the period between now and the last status registered. By default None

  • subperiod_length

    (timedelta | None, default: None ) –

    If set, the period will be split into subperiods of this length. This is useful to avoid timeouts when fetching data from the ONS API. By default will be 7 days.

  • overlap_interval

    (timedelta | None, default: None ) –

    If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.

    If None, the start of the period will not be changed.

    By default None.

Returns:

  • list[str]

    List of objects that had errors during the process

Source code in echo_ons/sager.py
def ons_status_importer(
    objects: list[str | int] | None = None,
    period: DateTimeRange = None,
    subperiod_length: timedelta | None = None,
    overlap_interval: timedelta | None = None,
) -> list[str]:
    """Wrapper function to validate parameters, fetch, process and save ONS Events data.

    Parameters
    ----------
    objects : list[str | int] | None, optional
        The desired ONS site objects. It can be either the name of the ONS object or their id number at performance_db. If not used the function will try to fetch all data from all objects. By default None
    period : DateTimeRange, optional
        The desired period to fetch. If not used the function will use the period between now and the last status registered. By default None
    subperiod_length : timedelta | None, optional
        If set, the period will be split into subperiods of this length. This is useful to avoid timeouts when fetching data from the ONS API.
        By default will be 7 days.
    overlap_interval : timedelta | None, optional
        If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.

        If None, the start of the period will not be changed.

        By default None.

    Returns
    -------
    list[str]
        List of objects that had errors during the process

    """
    perfdb = PerfDB(application_name="ons_event_status_importer")

    with perfdb.conn.reconnect() as conn:
        # creating period
        if not period:
            start_date = pd.read_sql_query("SELECT date FROM ons_data_validation ORDER BY date DESC LIMIT 1", conn).iloc[0]["date"]

            period = DateTimeRange(start=start_date, end=datetime.now())
        if overlap_interval:
            period.start -= overlap_interval

        # attributes to connect to the ONS API
        data_source = perfdb.datasources.instances.get(data_source_names=["ons_sager"], output_type="dict", get_attributes=True)[
            "ons_sager"
        ]
        conn_objs = data_source["object_names"]

        # getting objects definitions
        conn_objs = perfdb.objects.instances.get(object_names=conn_objs, output_type="DataFrame", get_attributes=True)

        # simple object validation
        error_objects = []
        if not objects:
            objects = conn_objs["ons_site_id"].to_list()
        elif all(isinstance(x, str) for x in objects):
            error_objects.extend(ob for ob in objects if ob not in conn_objs.index.to_list())
            objects = conn_objs["ons_site_id"].to_list()
        elif all(isinstance(x, int) for x in objects):
            error_objects.extend(ob for ob in objects if ob not in conn_objs["id"].to_list())
        if error_objects:
            msg = f"The objects wit ons ids {', '.join(str(error_objects))} do not exist on performance_db"
            logger.critical(msg)
            return error_objects
        logger.info("Object validation: OK")

    # splitting into subperiods
    if subperiod_length is None:
        subperiod_length = timedelta(days=7)
    subperiods = period.split_multiple(separator=subperiod_length, normalize=True)

    session = None

    for subperiod in subperiods:
        subperiod_attemps = 0
        subperiod_succes = False
        while not subperiod_succes:
            try:
                logger.info(f"Getting data for {subperiod}")

                if session is None:
                    succes = False
                    try_count = 0
                    while not succes:
                        try:
                            session = login_ons(data_source["user"], data_source["password"])
                            succes = True
                        except Exception as ex:
                            logger.exception("Failed to connect to ONS")
                            session = None
                            try_count += 1
                            if try_count > 3:
                                raise ConnectionError(f"Failed to connect to ONS after {try_count} attempts") from ex
                logger.info("Connected to ONS!")

                events = _get_all_status(session, objects, subperiod)
                events = _process_status_data(events)
                _save_status_data(events, conn_objs)
                subperiod_succes = True
            except Exception:
                logger.exception(f"Failed to get data for {subperiod}")
                subperiod_attemps += 1
                if subperiod_attemps > 3:
                    logger.error(f"Failed to get data for {subperiod} after {subperiod_attemps} attempts. skipping to the next subperiod.")
                    break

    return []

sager_importer(period, objects=None, recalc_features=False, calc_limitation_lost_energy=True, overlap_interval=None)

Function that gathers limitation and time series data from the ONS SAGER system and saves it to our database.

If requested, it will also recalculate the features of the imported objects.

Parameters:

  • period

    (dict[str, datetime | None] | DateTimeRange) –

    The period to fetch data from.

    If a dict is passed, it must contain the keys "start" and "end". They can be a datetime object or None. If None, they will be dynamically set using existing data from the database.

    If a DateTimeRange object is passed, the start and end datetimes will be used.

  • objects

    (list[str] | None, default: None ) –

    List with the name of the objects as in the Performance Database.

    These objects must be connected to data source "ons_sager".

    If [], all objects connected to the data source will be imported, by default [].

  • recalc_features

    (bool, default: False ) –

    If True, the function will recalculate the features of the ONS objects, by default False.

  • calc_limitation_lost_energy

    (bool, default: True ) –

    If True, the function will calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.

    By default True.

  • overlap_interval

    (timedelta | None, default: None ) –

    If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.

    If None, the start of the period will not be changed.

    By default None.

Returns:

  • dict[str, DateTimeRange]

    Dict with the imported objects and the periods that were imported. It will be in the format {object_name: DateTimeRange, ...}.

  • list[str]

    List with the names of the objects that failed to import.

Source code in echo_ons/sager.py
def sager_importer(
    period: dict[str, datetime | None] | DateTimeRange,
    objects: list[str] | None = None,
    recalc_features: bool = False,
    calc_limitation_lost_energy: bool = True,
    overlap_interval: timedelta | None = None,
) -> tuple[dict[str, DateTimeRange], list[str]]:
    # sourcery skip: low-code-quality
    """Function that gathers limitation and time series data from the ONS SAGER system and saves it to our database.

    If requested, it will also recalculate the features of the imported objects.

    Parameters
    ----------
    period : dict[str, datetime | None] | DateTimeRange
        The period to fetch data from.

        If a dict is passed, it must contain the keys "start" and "end". They can be a datetime object or None. If None, they will be dynamically set using existing data from the database.

        If a DateTimeRange object is passed, the start and end datetimes will be used.
    objects : list[str] | None, optional
        List with the name of the objects as in the Performance Database.

        These objects must be connected to data source "ons_sager".

        If [], all objects connected to the data source will be imported, by default [].
    recalc_features : bool, optional
        If True, the function will recalculate the features of the ONS objects, by default False.
    calc_limitation_lost_energy : bool, optional
        If True, the function will calculate the produced and lost energy during each limitation event. This will be done for each SPE and results will be saved to table ons_spe_limitations in performance_db.

        By default True.
    overlap_interval : timedelta | None, optional
        If set, the start of the period will be moved back by the overlap_interval. This is useful to make sure previous data was correctly imported.

        If None, the start of the period will not be changed.

        By default None.

    Returns
    -------
    dict[str, DateTimeRange]
        Dict with the imported objects and the periods that were imported. It will be in the format {object_name: DateTimeRange, ...}.
    list[str]
        List with the names of the objects that failed to import.

    """
    if objects is None:
        objects = []
    # dict with the imported objects and it's day_periods
    imported_objects = {}
    # list of objects with errors
    error_objects = []

    # getting data source information
    perfdb = PerfDB(application_name="sager_importer")
    data_source = perfdb.datasources.instances.get(data_source_names=["ons_sager"], output_type="dict", get_attributes=True)["ons_sager"]
    conn_objs = data_source["object_names"]

    # getting objects definitions
    conn_objs = perfdb.objects.instances.get(object_names=conn_objs, output_type="DataFrame", get_attributes=True)

    if not objects:
        objects = conn_objs.index.to_list()

    if missing_objects := list(set(objects) - set(conn_objs.index.to_list())):
        logger.error(f"The following objects are not connected to the data source: {missing_objects}")
        # removing missing objects from the list
        objects = list(set(objects) - set(missing_objects))
        # adding to the error_objects list
        error_objects += missing_objects

    def connect_to_ons(data_source: dict, max_try: int = 3) -> requests.Session:
        """Function to connect to the ONS API.

        Parameters
        ----------
        data_source : dict
            Dictionary containing the user and password to connect to the ONS API.
        max_try : int, optional
            Maximum number of attempts to connect to the ONS API. By default 3.

        Returns
        -------
        requests.Session
            Session object with login information.
        """
        succes = False
        try_count = 0
        while not succes:
            try:
                session = login_ons(data_source["user"], data_source["password"])
                succes = True
            except Exception as ex:
                logger.exception("Failed to connect to ONS")
                try_count += 1
                if try_count > max_try:
                    raise ConnectionError(f"Failed to connect to ONS after {try_count} attempts") from ex
        return session

    session = connect_to_ons(data_source)

    # iterating each object
    for obj in objects:
        try:
            # getting the period to fetch data from
            this_period = period.copy()
            if isinstance(period, dict):
                if this_period["end"] is None:
                    this_period["end"] = datetime.now()
                if this_period["start"] is None:
                    # getting latest data from the database
                    query = f"SELECT start FROM v_ons_limitations_realtime WHERE object_name = '{obj}' ORDER BY start DESC LIMIT 1"  # noqa
                    with perfdb.conn.reconnect() as conn:
                        last_start = conn.read_to_pandas(query)
                        if last_start.empty:
                            this_period["start"] = datetime(2020, 1, 1)
                        else:
                            this_period["start"] = last_start.iloc[0, 0]
                # converting to DateTimeRange
                this_period = DateTimeRange.from_dict(this_period)

            # adjusting start of the period if requested
            if overlap_interval is not None:
                if not isinstance(overlap_interval, timedelta):
                    raise TypeError(f"overlap_interval must be a timedelta object, got {type(overlap_interval)}")
                this_period.start -= overlap_interval

            imported_objects[obj] = this_period
            logger.info(f"{obj} - Getting data from {this_period}")

            # splitting the period into days
            day_periods = this_period.split_multiple(separator=timedelta(days=1), normalize=True)

            # iterating each day
            for day_period in day_periods:
                try:
                    success = False
                    retry = 0
                    while not success and retry < 2:
                        try:
                            # getting the day
                            day = day_period.start
                            # getting the data
                            data = _get_data(session, day, conn_objs.loc[obj, "ons_site_id"])
                            success = True
                            # checking if the data is empty
                            if data is None:
                                continue
                            # processing the data
                            processed_data, ons_generation_data = _process_data(data, day, obj)
                            # saving the data
                            _save_data(processed_data, obj, ons_generation_data)
                        except Exception:
                            # reconnecting to ONS
                            logger.exception(f"{obj} - Could not get data for the day {day}. Will try to connect again.")
                            session = connect_to_ons(data_source)
                            retry += 1
                    if not success:
                        raise ConnectionError("Could not get data from ONS")

                except Exception:
                    logger.exception(f"{obj} - Could not get data for the day {day}")
                    error_objects.append(obj)
                    continue

        except Exception:
            logger.exception(f"{obj} Could not get data!")
            error_objects.append(obj)
            continue

    session.close()

    # recalculating features
    if recalc_features:
        from echo_energycalc import (
            CalculationHandler,  # pylint: disable=import-outside-toplevel
        )

        logger.info("Recalculating features")
        for obj, obj_period in imported_objects.items():
            logger.info(f"{obj} - Recalculating features for period {obj_period}")
            try:
                calc = CalculationHandler.from_type_defaults(object_filters={"object_names": [obj]})[0]
                errs = calc.calculate(period=obj_period)
                if errs.total_exception_count > 0:
                    raise RuntimeError(f"Errors occurred while recalculating features for {obj} and period {obj_period}")
            except Exception:
                logger.exception(f"{obj} - Could not recalculate features")
                error_objects.append(obj)
                continue

    if calc_limitation_lost_energy:
        logger.info("Calculating limitation events lost energy")
        for obj, obj_period in imported_objects.items():
            logger.info(f"{obj} - Calculating limitation events lost energy for period {obj_period}")
            try:
                if errs := calc_limitation_events_lost_energy(
                    obj_period,
                    objects=[obj],
                ):
                    raise RuntimeError(f"Errors occurred while calculating limitation events lost energy for {obj} and period {obj_period}")
            except Exception:
                logger.exception(f"{obj} - Could not calculate limitation events lost energy")
                error_objects.append(obj)
                continue

    error_objects = sorted(set(error_objects))
    return imported_objects, error_objects