Source code for pycmor.std_lib.global_attributes

import datetime
import re
import uuid
from abc import abstractmethod

import xarray as xr

from ..core.factory import MetaFactory


[docs] class GlobalAttributes(metaclass=MetaFactory):
[docs] @abstractmethod def global_attributes(self): raise NotImplementedError()
[docs] @abstractmethod def subdir_path(self): raise NotImplementedError()
[docs] class CMIP7GlobalAttributes(GlobalAttributes):
[docs] def global_attributes(self): raise NotImplementedError()
[docs] def subdir_path(self): raise NotImplementedError()
[docs] class CMIP6GlobalAttributes(GlobalAttributes): def __init__(self, drv, cv, rule_dict): self.drv = drv self.cv = cv self.rule_dict = rule_dict @property def required_global_attributes(self): return self.cv["required_global_attributes"]
[docs] def global_attributes(self) -> dict: d = {} for key in self.required_global_attributes: func = getattr(self, f"get_{key}") d[key] = func() return d
[docs] def subdir_path(self) -> str: mip_era = self.get_mip_era() activity_id = self.get_activity_id() institution_id = self.get_institution_id() source_id = self.get_source_id() experiment_id = self.get_experiment_id() member_id = self.get_variant_label() sub_experiment_id = self.get_sub_experiment_id() if sub_experiment_id != "none": member_id = f"{member_id}-{sub_experiment_id}" table_id = self.get_table_id() variable_id = self.get_variable_id() grid_label = self.get_grid_label() version = f"v{datetime.datetime.today().strftime('%Y%m%d')}" directory_path = f"{mip_era}/{activity_id}/{institution_id}/{source_id}/{experiment_id}/{member_id}/{table_id}/{variable_id}/{grid_label}/{version}" # noqa: E501 return directory_path
[docs] def _variant_label_components(self, label: str): pattern = re.compile( r"r(?P<realization_index>\d+)" r"i(?P<initialization_index>\d+)" r"p(?P<physics_index>\d+)" r"f(?P<forcing_index>\d+)" r"$" ) d = pattern.match(label) if d is None: raise ValueError( f"`label` must be of the form 'r<int>i<int>p<int>f<int>', Got: {label}" ) d = {name: int(val) for name, val in d.groupdict().items()} return d
[docs] def get_variant_label(self): return self.rule_dict["variant_label"]
[docs] def get_physics_index(self): variant_label = self.get_variant_label() components = self._variant_label_components(variant_label) return components["physics_index"]
[docs] def get_forcing_index(self): variant_label = self.get_variant_label() components = self._variant_label_components(variant_label) return components["forcing_index"]
[docs] def get_initialization_index(self): variant_label = self.get_variant_label() components = self._variant_label_components(variant_label) return components["initialization_index"]
[docs] def get_realization_index(self): variant_label = self.get_variant_label() components = self._variant_label_components(variant_label) return components["realization_index"]
[docs] def get_source_id(self): return self.rule_dict["source_id"]
[docs] def get_source(self): # TODO: extend this to include all model components model_component = self.get_realm() source_id = self.get_source_id() cv_source_id = self.cv["source_id"][source_id] release_year = cv_source_id["release_year"] # return f"{source_id} ({release_year})" return f"{model_component} ({release_year})"
[docs] def get_institution_id(self): source_id = self.get_source_id() cv_source_id = self.cv["source_id"][source_id] institution_ids = cv_source_id["institution_id"] if len(institution_ids) > 1: user_institution_id = self.rule_dict.get("institution_id", None) if user_institution_id: if user_institution_id not in institution_ids: raise ValueError( f"Institution ID '{user_institution_id}' is not valid. " f"Allowed values: {institution_ids}" ) return user_institution_id raise ValueError( f"Multiple institutions are not supported, got: {institution_ids}" ) return institution_ids[0]
[docs] def get_institution(self): institution_id = self.get_institution_id() return self.cv["institution_id"][institution_id]
[docs] def get_realm(self): # `realm`` from table header turns out to be incorrect in some of the cases. # So instead read it from the user input to ensure the correct value # # return self.drv.table_header.realm model_component = self.rule_dict.get("model_component", None) if model_component is None: model_component = self.drv.model_component if len(model_component.split()) > 1: model_component = self.drv.table_header.realm return model_component
[docs] def get_grid_label(self): return self.rule_dict["grid_label"]
[docs] def get_grid(self): source_id = self.get_source_id() cv_source_id = self.cv["source_id"][source_id] model_component = self.get_realm() grid_description = cv_source_id["model_component"][model_component][ "description" ] if grid_description == "none": # check if user has provided grid description user_grid_description = self.rule_dict.get( "description", self.rule_dict.get("grid", None) ) if user_grid_description: grid_description = user_grid_description return grid_description
[docs] def get_nominal_resolution(self): source_id = self.get_source_id() cv_source_id = self.cv["source_id"][source_id] model_component = self.get_realm() cv_model_component = cv_source_id["model_component"][model_component] if "native_nominal_resolution" in cv_model_component: nominal_resolution = cv_model_component["native_nominal_resolution"] if "native_ominal_resolution" in cv_model_component: nominal_resolution = cv_model_component["native_ominal_resolution"] if nominal_resolution == "none": # check if user has provided nominal resolution user_nominal_resolution = self.rule_dict.get( "nominal_resolution", self.rule_dict.get("resolution", None) ) if user_nominal_resolution: nominal_resolution = user_nominal_resolution return nominal_resolution
[docs] def get_license(self): institution_id = self.get_institution_id() source_id = self.get_source_id() cv_source_id = self.cv["source_id"][source_id] license_id = cv_source_id["license_info"]["id"] license_url = self.cv["license"]["license_options"][license_id]["license_url"] license_id = self.cv["license"]["license_options"][license_id]["license_id"] license_text = self.cv["license"]["license"] # make placeholders in license text license_text = re.sub(r"<.*?>", "{}", license_text) further_info_url = self.rule_dict.get("further_info_url", None) if further_info_url is None: license_text = re.sub(r"\[.*?\]", "", license_text) license_text = license_text.format(institution_id, license_id, license_url) else: license_text = license_text.format( institution_id, license_id, license_url, further_info_url ) return license_text
[docs] def get_experiment_id(self): return self.rule_dict["experiment_id"]
[docs] def get_experiment(self): experiment_id = self.get_experiment_id() return self.cv["experiment_id"][experiment_id]["experiment"]
[docs] def get_activity_id(self): experiment_id = self.get_experiment_id() cv_experiment_id = self.cv["experiment_id"][experiment_id] activity_ids = cv_experiment_id["activity_id"] if len(activity_ids) > 1: user_activity_id = self.rule_dict.get("activity_id", None) if user_activity_id: if user_activity_id not in activity_ids: raise ValueError( f"Activity ID '{user_activity_id}' is not valid. " f"Allowed values: {activity_ids}" ) return user_activity_id raise ValueError( f"Multiple activities are not supported, got: {activity_ids}" ) return activity_ids[0]
[docs] def get_sub_experiment_id(self): experiment_id = self.get_experiment_id() cv_experiment_id = self.cv["experiment_id"][experiment_id] sub_experiment_ids = cv_experiment_id["sub_experiment_id"] sub_experiment_id = " ".join(sub_experiment_ids) return sub_experiment_id
[docs] def get_sub_experiment(self): sub_experiment_id = self.get_sub_experiment_id() if sub_experiment_id == "none": sub_experiment = "none" else: sub_experiment = sub_experiment_id.split()[0] return sub_experiment
[docs] def get_source_type(self): experiment_id = self.get_experiment_id() cv_experiment_id = self.cv["experiment_id"][experiment_id] source_type = " ".join(cv_experiment_id["required_model_components"]) return source_type
[docs] def get_table_id(self): return self.drv.table_header.table_id
[docs] def get_mip_era(self): return self.drv.table_header.mip_era
[docs] def get_frequency(self): return self.drv.frequency
[docs] def get_Conventions(self): header = self.drv.table_header return header.Conventions
[docs] def get_product(self): header = self.drv.table_header return header.product
[docs] def get_data_specs_version(self): header = self.drv.table_header return str(header.data_specs_version)
[docs] def get_creation_date(self): return self.rule_dict["creation_date"]
[docs] def get_tracking_id(self): return "hdl:21.14100/" + str(uuid.uuid4())
[docs] def get_variable_id(self): return self.rule_dict["cmor_variable"]
[docs] def get_further_info_url(self): mip_era = self.get_mip_era() institution_id = self.get_institution_id() source_id = self.get_source_id() experiment_id = self.get_experiment_id() sub_experiment_id = self.get_sub_experiment_id() variant_label = self.get_variant_label() return ( f"https://furtherinfo.es-doc.org/" f"{mip_era}.{institution_id}.{source_id}.{experiment_id}.{sub_experiment_id}.{variant_label}" )
[docs] def set_global_attributes(ds, rule): """Set global attributes for the dataset""" if isinstance(ds, xr.DataArray): ds = ds.to_dataset() ds.attrs.update(rule.ga.global_attributes()) return ds