import copy
import datetime
import pathlib
import re
import typing
import warnings
import yaml
from ..data_request.table import DataRequestTable
from ..data_request.variable import DataRequestVariable
from . import pipeline
from .gather_inputs import InputFileCollection
from .logging import logger
[docs]
class Rule:
def __init__(
self,
*,
name: str = None,
inputs: typing.List[dict] = None,
cmor_variable: str,
pipelines: typing.List[pipeline.Pipeline] = None,
tables: typing.List[DataRequestTable] = None,
data_request_variables: typing.List[DataRequestVariable] = None,
**kwargs,
):
"""
Initialize a Rule object.
This method can only be called with keyword arguments.
Parameters
----------
inputs : list of dicts for InputFileCollection
Dictionaries should contain the keys "path" and "pattern".
cmor_variable : str
The CMOR variable name. This is the name of the variable as it should appear in the CMIP archive.
pipelines : list of Pipeline objects
A list of Pipeline objects that define the transformations to be applied to the data.
tables : list of DataRequestTable objects
A list of data request tables associated with this rule
data_request_variables : DataRequestVariable or None :
The DataRequestVariables this rule should create
"""
self.name = name
self.inputs = [
InputFileCollection.from_dict(inp_dict) for inp_dict in (inputs or [])
]
self.cmor_variable = cmor_variable
self.pipelines = pipelines or [pipeline.DefaultPipeline()]
self.tables = tables or []
self.data_request_variables = data_request_variables or []
# NOTE(PG): I'm not sure I really like this part. It is too magical and makes the object's public API unclear.
# Attach all keyword arguments to the object
for key, value in kwargs.items():
setattr(self, key, value)
# Internal flags:
self._pipelines_are_mapped = False
def __getstate__(self):
"""Custom pickling of a Rule"""
state = self.__dict__.copy()
return state
[docs]
def get(self, key, default=None):
"""Gets an attribute from the Rule object
Useful for passing the Rule object to other functions that may not know the
current structure, e.g. when calling Pipeline steps.
Parameters
----------
key : str
The name of the attribute to get.
default : Any, optional
The value to return if the attribute does not exist.
Returns
-------
value : Any
The value of the attribute, or the default value if the attribute does not exist.
"""
return getattr(self, key, default)
[docs]
def set(self, key, value, force=False, warn=True):
"""
Set a new attribute for the object.
Parameters
----------
key : str
The name of the attribute to set.
value : Any
The value to set for the attribute.
force : bool, optional
If True, the attribute will be overwritten if it already exists.
If False (default), an AttributeError will be raised if the attribute already exists.
warn : bool, optional
If True (default) a warning will be issued if the attribute already exists, and
it will not be overwritten. If False, an AttributeError will be raised if the attribute
already exists.
Returns
-------
value : Any
Returns the value appended to the object. This is the same behaviour as setattr.
Raises
------
AttributeError
If the attribute already exists and force and warn are both False.
"""
if hasattr(self, key) and not force:
if warn:
warnings.warn(
f"Attribute {key} already exists. Use force=True to overwrite."
)
else:
raise AttributeError(
f"Attribute {key} already exists. Use force=True to overwrite."
)
return setattr(self, key, value)
def __str__(self):
return f"Rule for {self.cmor_variable} with input patterns {self.input_patterns} and pipelines {self.pipelines}"
[docs]
def match_pipelines(self, pipelines, force=False):
"""
Match the pipelines in the rule with the pipelines in the configuration. The pipelines
should be a list of pipeline instances that can be matched with the rule's required pipelines.
Parameters
----------
list : list of pipeline.Pipeline
Available pipelines to use
force : bool, optional
If True, the pipelines will be remapped even if they were already mapped.
Mutates
-------
self.pipelines : list of str --> list of pipeline.Pipeline objects
``self.pipelines`` will be replaced from a list of strings to a list of
Pipeline objects. The order of the pipelines will be preserved.
"""
if self._pipelines_are_mapped and not force:
logger.debug("Pipelines already mapped, nothing to do")
return self.pipelines
known_pipelines = {p.name: p for p in pipelines}
logger.debug("The following pipelines are known:")
for pl_name, pl in known_pipelines.items():
logger.debug(f"{pl_name}: {pl}")
matched_pipelines = list()
for pl in self.pipelines:
logger.debug(f"Working on: {pl}")
# Pipeline was already matched
if isinstance(pl, pipeline.Pipeline):
matched_pipelines.append(pl)
elif isinstance(pl, str):
# Pipeline name:
matched_pipelines.append(known_pipelines[pl])
else:
logger.error(f"No known way to match the pipeline {pl}")
raise TypeError(f"{pl} must be a string or a pipeline.Pipeline object!")
self.pipelines = matched_pipelines
self._pipelines_are_mapped = True
[docs]
@classmethod
def from_dict(cls, data):
"""Build a rule object from a dictionary
The dictionary should have the following keys: "inputs", "cmor_variable",
"pipelines". Note that the ``"inputs"`` key should contain a list of dictionaries
that can be used to build InputFileCollection objects. The ``"pipelines"`` key
should contain a list of dictionaries that can be used to build Pipeline objects, and
the ``cmor_variable`` is just a string.
Parameters
----------
data : dict
A dictionary containing the rule data.
"""
return cls(
name=data.pop("name", None),
inputs=data.pop("inputs"),
cmor_variable=data.pop("cmor_variable"),
pipelines=data.pop("pipelines", []),
**data,
)
[docs]
@classmethod
def from_yaml(cls, yaml_str):
"""Wrapper around ``from_dict`` for initializing from YAML"""
return cls.from_dict(yaml.safe_load(yaml_str))
[docs]
def add_table(self, tbl):
"""Add a table to the rule"""
self.tables.append(tbl)
self.tables = [t for t in self.tables if t is not None]
[docs]
def remove_table(self, tbl):
"""Remove a table from the rule"""
self.tables.remove(tbl)
[docs]
def add_data_request_variable(self, drv):
"""Add a data request variable to the rule."""
self.data_request_variables.append(drv)
# Filter out Nones
self.data_request_variables = [
v for v in self.data_request_variable if v is not None
]
[docs]
def remove_data_request_variable(self, drv):
"""Remove a data request variable from the rule."""
self.data_request_variables.remove(drv)
@property
def input_patterns(self):
"""Return a list of compiled regex patterns for the input files."""
return [re.compile(f"{inp.path}/{inp.pattern}") for inp in self.inputs]
[docs]
def clone(self):
"""Creates a copy of this rule object as it is currently configured."""
return copy.deepcopy(self)
[docs]
def expand_drvs(self):
"""
Depluralize the rule by creating a new rule for each DataRequestVariable.
This method clones the current rule object for each DataRequestVariable (``drv``) it contains.
For each cloned rule, it also clones the corresponding drv and sets its tables, frequencies,
cell_methods, and cell_measures attributes to the individual elements from the original drv.
The cloned drv is then set as the only drv of the cloned rule. The method returns a list of all
these cloned rules.
Returns
-------
list
A list of cloned rule objects, each containing a single DataRequestVariable.
"""
clones = []
for drv in self.data_request_variables:
rule_clone = self.clone()
drv_clone = drv.clone()
# FIXME: This is bad. I need to extract one rule for each table,
# but the newer API doesn't work as cleanly here...
rule_clone.data_request_variables = [drv_clone]
clones.append(rule_clone)
return clones
[docs]
def depluralize_drvs(self):
"""Depluralizes Data Request Variables to just a single entry"""
assert len(self.data_request_variables) == 1
self.data_request_variable = self.data_request_variables[0]
del self.data_request_variables
[docs]
def global_attributes_set_on_rule(self):
attrs = (
"source_id",
"grid_label",
"cmor_variable",
"variant_label",
"experiment_id",
"activity_id", # optional
"institution_id", # optional
"model_component", # optional
"further_info_url", # optional
)
# attribute `creation_date` is the time-stamp of inputs directory
try:
afile = next(
f for file_collection in self.inputs for f in file_collection.files
)
afile = pathlib.Path(afile)
dir_timestamp = datetime.datetime.fromtimestamp(
afile.parent.stat().st_ctime
)
except FileNotFoundError:
# No input files, so use the current time -- this is a fallback triggered for test cases
dir_timestamp = datetime.datetime.now()
time_format = "%Y-%m-%dT%H:%M:%SZ"
creation_date = dir_timestamp.strftime(time_format)
result = {attr: getattr(self, attr, None) for attr in attrs}
result["creation_date"] = creation_date
return result
[docs]
def create_global_attributes(self, GlobalAttributesClass):
self.ga = GlobalAttributesClass(
self.data_request_variable,
self.controlled_vocabularies,
self.global_attributes_set_on_rule(),
)