Skip to content

Dataset Resource Values

CceeOpenDataDatasetsResourcesValues(ccee)

Class used for handling CCEE values from dataset resources. Can be accessed via ccee.opendata.datasets.resources.values.

Source code in echo_ons/ccee_root.py
def __init__(self, ccee: e_o.Ccee) -> None:
    """Base class that all subclasses should inherit from.

    Parameters
    ----------
    ccee : Ccee
        Top level object carrying all functionality and the connection handler.
    """
    self._ccee: e_o.Ccee = ccee

get(dataset_name, resource_names=None, filters=None, output_type='DataFrame')

Gets the values from a dataset resource.

This uses ckan API to get the values from a dataset resource. More documentation at https://docs.ckan.org/en/latest/maintaining/datastore.html#ckanext.datastore.logic.action.datastore_search

Keep in mind that CCEE Open Data portal launched at around June of 2024, so data before that is not available.

Parameters:

  • dataset_name

    (str) –

    Name of the dataset to get the resources from.

  • resource_names

    (list[str], default: None ) –

    List of resource names to get the values from. If None, gets all resources, by default None

  • filters

    (dict[str, Any], default: None ) –

    Filters to apply to the data. It is a dictionary with the keys being the field names and the values being the values to filter by. The values can be lists to filter by multiple values.

    If the dataset supports filtering, this will be applied in the API call, making it faster. Otherwise, the entire dataset will be downloaded and filtering will be done locally.

    If not set, no filtering is done. By default None

  • output_type

    (Literal['dict', 'DataFrame'], default: 'DataFrame' ) –

    Type of the output, by default "DataFrame"

Returns:

  • list[dict[str, Any]]

    If output_type is "dict", returns a list of dictionaries with the results.

  • DataFrame

    If output_type is "DataFrame", returns a DataFrame with the results. The index is the resource names.

Source code in echo_ons/ccee_opendata_datasets_resources_values.py
@validate_call
def get(
    self,
    dataset_name: str,
    resource_names: list[str] | None = None,
    filters: dict[str, Any] | None = None,
    output_type: Literal["dict", "DataFrame"] = "DataFrame",
) -> list[dict[str, Any]] | DataFrame:
    """Gets the values from a dataset resource.

    This uses ckan API to get the values from a dataset resource. More documentation at https://docs.ckan.org/en/latest/maintaining/datastore.html#ckanext.datastore.logic.action.datastore_search

    Keep in mind that CCEE Open Data portal launched at around June of 2024, so data before that is not available.

    Parameters
    ----------
    dataset_name : str
        Name of the dataset to get the resources from.
    resource_names : list[str], optional
        List of resource names to get the values from. If None, gets all resources, by default None
    filters : dict[str, Any], optional
        Filters to apply to the data. It is a dictionary with the keys being the field names and the values being the values to filter by. The values can be lists to filter by multiple values.

        If the dataset supports filtering, this will be applied in the API call, making it faster. Otherwise, the entire dataset will be downloaded and filtering will be done locally.

        If not set, no filtering is done. By default None
    output_type : Literal["dict", "DataFrame"], optional
        Type of the output, by default "DataFrame"

    Returns
    -------
    list[dict[str, Any]]
        If output_type is "dict", returns a list of dictionaries with the results.
    DataFrame
        If output_type is "DataFrame", returns a DataFrame with the results. The index is the resource names.
    """
    # searching for the dataset
    dataset = self._ccee.opendata.datasets.search("name", dataset_name, output_type="dict")
    if dataset_name not in dataset:
        raise ValueError(f"Dataset {dataset_name} not found")

    # getting the resources
    resources = self._ccee.opendata.datasets.resources.get(dataset_name, output_type="dict")
    if resource_names:
        wrong_names = set(resource_names) - set(resources.keys())
        if wrong_names:
            raise ValueError(f"Resource(s) {wrong_names} not found")
    else:
        resource_names = list(resources.keys())

    endpoint = "api/3/action/datastore_search"

    results = []
    for resource_name in resource_names:
        resource_id = resources[resource_name]["id"]

        # checking if search is enabled (parameter datastore_active must be True)
        searcheable = resources[resource_name].get("datastore_active", False)

        if searcheable:
            finished = False
            offset = 0
            this_results = []
            n_per_request = 10000
            while not finished:
                args = {
                    "resource_id": resource_id,
                    "limit": n_per_request,
                    "offset": offset,
                    "filters": filters,
                }
                response = self._ccee._opendata_conn.post(endpoint, json=args)  # noqa: SLF001
                if not response.status_code == 200:
                    raise ValueError(f"Failed to get data from CCEE open data. {response.text}")
                records = response.json()["result"]["records"]
                this_results.extend(records)
                if len(records) < n_per_request:
                    finished = True
                else:
                    offset += n_per_request

            results.extend(this_results)

        else:
            logger.warning(f"Resource {resource_name} is not searcheable. Getting entire resource file.")

            file_format = resources[resource_name].get("format", "").lower()

            url = resources[resource_name].get("url", "")
            if not url:
                raise ValueError(f"Resource {resource_name} does not have a URL")

            match file_format:
                case "gzip" | "csv":
                    df = pl.read_csv(url, separator=";", infer_schema_length=None)
                case _:
                    raise ValueError(f"Resource {resource_name} has an unsupported format: {file_format}")

            if filters:
                for key, value in filters.items():
                    df = df.filter(pl.col(key).is_in(value)) if isinstance(value, list) else df.filter(pl.col(key) == value)

            results.extend(df.to_dicts())

    if output_type == "DataFrame":
        results = DataFrame(results)

    return results